chore: Remove debug logging and pipecat-poc after E2EE fix confirmed working
- Remove setLevel(DEBUG) for livekit.agents/plugins (added for diagnostics) - Remove periodic E2EE cryptor/participant state poll loop (no longer needed) - Remove pipecat-poc/pipeline.py (POC never deployed, LiveKit approach confirmed) E2EE bidirectional voice confirmed working in MAT-36. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,133 +0,0 @@
|
|||||||
"""Configurable voice pipeline using Pipecat.
|
|
||||||
|
|
||||||
Builds a STT → LLM → TTS pipeline with context aggregators.
|
|
||||||
Providers are selected via config (STT_PROVIDER, TTS_PROVIDER env vars).
|
|
||||||
LLM uses OpenAI-compatible API via LiteLLM.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
|
|
||||||
import aiohttp
|
|
||||||
|
|
||||||
from pipecat.frames.frames import LLMMessagesAppendFrame
|
|
||||||
from pipecat.pipeline.pipeline import Pipeline
|
|
||||||
from pipecat.pipeline.runner import PipelineRunner
|
|
||||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
|
||||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
|
||||||
from pipecat.services.openai.llm import OpenAILLMService
|
|
||||||
from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport
|
|
||||||
|
|
||||||
from config import Config
|
|
||||||
from providers import create_stt, create_tts
|
|
||||||
|
|
||||||
logger = logging.getLogger("pipecat-poc.pipeline")
|
|
||||||
|
|
||||||
|
|
||||||
def _create_llm(cfg: Config):
|
|
||||||
"""Create LLM service using OpenAI-compatible API (LiteLLM)."""
|
|
||||||
return OpenAILLMService(
|
|
||||||
base_url=cfg.llm.base_url,
|
|
||||||
api_key=cfg.llm.api_key,
|
|
||||||
model=cfg.llm.model,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def run_voice_pipeline(token: str, lk_room_name: str, cfg: Config):
|
|
||||||
"""Run the Pipecat voice pipeline.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
token: LiveKit JWT used to connect
|
|
||||||
lk_room_name: LiveKit room name
|
|
||||||
cfg: Application configuration
|
|
||||||
"""
|
|
||||||
http_session = aiohttp.ClientSession()
|
|
||||||
stt = await create_stt(cfg.stt, http_session)
|
|
||||||
tts = await create_tts(cfg.tts, http_session)
|
|
||||||
llm = _create_llm(cfg)
|
|
||||||
|
|
||||||
transport = LiveKitTransport(
|
|
||||||
url=cfg.livekit.url,
|
|
||||||
token=token,
|
|
||||||
room_name=lk_room_name,
|
|
||||||
params=LiveKitParams(
|
|
||||||
audio_in_enabled=True,
|
|
||||||
audio_out_enabled=True,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create context with system prompt and context aggregators
|
|
||||||
context = OpenAILLMContext(
|
|
||||||
messages=[{"role": "system", "content": cfg.voice.system_prompt}],
|
|
||||||
)
|
|
||||||
context_aggregator = llm.create_context_aggregator(context)
|
|
||||||
|
|
||||||
pipeline = Pipeline([
|
|
||||||
transport.input(),
|
|
||||||
stt,
|
|
||||||
context_aggregator.user(), # TranscriptionFrame → user message in context
|
|
||||||
llm,
|
|
||||||
tts,
|
|
||||||
transport.output(),
|
|
||||||
context_aggregator.assistant(), # LLM output → assistant message in context
|
|
||||||
])
|
|
||||||
|
|
||||||
task = PipelineTask(
|
|
||||||
pipeline,
|
|
||||||
params=PipelineParams(
|
|
||||||
allow_interruptions=True,
|
|
||||||
enable_metrics=True,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
@transport.event_handler("on_first_participant_joined")
|
|
||||||
async def on_first_participant(transport_obj, participant):
|
|
||||||
logger.info("First participant joined: %s", participant)
|
|
||||||
await task.queue_frames([
|
|
||||||
LLMMessagesAppendFrame(
|
|
||||||
[{"role": "user", "content": cfg.voice.greeting}],
|
|
||||||
run_llm=True,
|
|
||||||
),
|
|
||||||
])
|
|
||||||
|
|
||||||
runner = PipelineRunner()
|
|
||||||
logger.info("Starting Pipecat pipeline (stt=%s, tts=%s, llm=%s)",
|
|
||||||
cfg.stt.provider, cfg.tts.provider, cfg.llm.model)
|
|
||||||
await runner.run(task)
|
|
||||||
|
|
||||||
|
|
||||||
class VoicePipelineSession:
|
|
||||||
"""Manages a single voice pipeline session lifecycle.
|
|
||||||
|
|
||||||
Wraps the Pipecat pipeline with start/stop semantics.
|
|
||||||
Used by bot_bridge.py to manage pipeline per Matrix call.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, token: str, lk_room_name: str, cfg: Config):
|
|
||||||
self.token = token
|
|
||||||
self.lk_room_name = lk_room_name
|
|
||||||
self.cfg = cfg
|
|
||||||
self._task: asyncio.Task | None = None
|
|
||||||
|
|
||||||
async def start(self):
|
|
||||||
"""Start the voice pipeline in a background task."""
|
|
||||||
self._task = asyncio.create_task(self._run())
|
|
||||||
logger.info("Voice pipeline session started")
|
|
||||||
|
|
||||||
async def stop(self):
|
|
||||||
"""Stop the voice pipeline."""
|
|
||||||
if self._task and not self._task.done():
|
|
||||||
self._task.cancel()
|
|
||||||
try:
|
|
||||||
await self._task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
logger.info("Voice pipeline session stopped")
|
|
||||||
|
|
||||||
async def _run(self):
|
|
||||||
try:
|
|
||||||
await run_voice_pipeline(self.token, self.lk_room_name, self.cfg)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logger.info("Pipeline cancelled")
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Pipeline failed")
|
|
||||||
18
voice.py
18
voice.py
@@ -15,9 +15,6 @@ from livekit.plugins import openai as lk_openai, elevenlabs, silero
|
|||||||
|
|
||||||
logger = logging.getLogger("matrix-ai-voice")
|
logger = logging.getLogger("matrix-ai-voice")
|
||||||
|
|
||||||
# Enable debug logging for agents pipeline to diagnose audio issues
|
|
||||||
logging.getLogger("livekit.agents").setLevel(logging.DEBUG)
|
|
||||||
logging.getLogger("livekit.plugins").setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
|
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
|
||||||
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
||||||
@@ -371,23 +368,8 @@ class VoiceSession:
|
|||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.error("Greeting timed out")
|
logger.error("Greeting timed out")
|
||||||
|
|
||||||
# Periodic E2EE state poll to diagnose incoming decryption
|
|
||||||
_poll_count = 0
|
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
_poll_count += 1
|
|
||||||
if _poll_count % 2 == 0: # Every 10s
|
|
||||||
try:
|
|
||||||
cryptors = self.lk_room.e2ee_manager.frame_cryptors()
|
|
||||||
for c in cryptors:
|
|
||||||
logger.info("E2EE_CRYPTOR: identity=%s key_index=%d enabled=%s",
|
|
||||||
c.participant_identity, c.key_index, c.enabled)
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug("frame_cryptors() failed: %s", e)
|
|
||||||
# Log E2EE state for all remote participants
|
|
||||||
for p in self.lk_room.remote_participants.values():
|
|
||||||
logger.info("REMOTE_PARTICIPANT: identity=%s tracks=%d",
|
|
||||||
p.identity, len(p.track_publications))
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info("Session cancelled for %s", self.room_id)
|
logger.info("Session cancelled for %s", self.room_id)
|
||||||
|
|||||||
Reference in New Issue
Block a user