diff --git a/pipecat-poc/pipeline.py b/pipecat-poc/pipeline.py deleted file mode 100644 index 1fa9b5d..0000000 --- a/pipecat-poc/pipeline.py +++ /dev/null @@ -1,133 +0,0 @@ -"""Configurable voice pipeline using Pipecat. - -Builds a STT → LLM → TTS pipeline with context aggregators. -Providers are selected via config (STT_PROVIDER, TTS_PROVIDER env vars). -LLM uses OpenAI-compatible API via LiteLLM. -""" - -import asyncio -import logging - -import aiohttp - -from pipecat.frames.frames import LLMMessagesAppendFrame -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.openai.llm import OpenAILLMService -from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport - -from config import Config -from providers import create_stt, create_tts - -logger = logging.getLogger("pipecat-poc.pipeline") - - -def _create_llm(cfg: Config): - """Create LLM service using OpenAI-compatible API (LiteLLM).""" - return OpenAILLMService( - base_url=cfg.llm.base_url, - api_key=cfg.llm.api_key, - model=cfg.llm.model, - ) - - -async def run_voice_pipeline(token: str, lk_room_name: str, cfg: Config): - """Run the Pipecat voice pipeline. - - Args: - token: LiveKit JWT used to connect - lk_room_name: LiveKit room name - cfg: Application configuration - """ - http_session = aiohttp.ClientSession() - stt = await create_stt(cfg.stt, http_session) - tts = await create_tts(cfg.tts, http_session) - llm = _create_llm(cfg) - - transport = LiveKitTransport( - url=cfg.livekit.url, - token=token, - room_name=lk_room_name, - params=LiveKitParams( - audio_in_enabled=True, - audio_out_enabled=True, - ), - ) - - # Create context with system prompt and context aggregators - context = OpenAILLMContext( - messages=[{"role": "system", "content": cfg.voice.system_prompt}], - ) - context_aggregator = llm.create_context_aggregator(context) - - pipeline = Pipeline([ - transport.input(), - stt, - context_aggregator.user(), # TranscriptionFrame → user message in context - llm, - tts, - transport.output(), - context_aggregator.assistant(), # LLM output → assistant message in context - ]) - - task = PipelineTask( - pipeline, - params=PipelineParams( - allow_interruptions=True, - enable_metrics=True, - ), - ) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant(transport_obj, participant): - logger.info("First participant joined: %s", participant) - await task.queue_frames([ - LLMMessagesAppendFrame( - [{"role": "user", "content": cfg.voice.greeting}], - run_llm=True, - ), - ]) - - runner = PipelineRunner() - logger.info("Starting Pipecat pipeline (stt=%s, tts=%s, llm=%s)", - cfg.stt.provider, cfg.tts.provider, cfg.llm.model) - await runner.run(task) - - -class VoicePipelineSession: - """Manages a single voice pipeline session lifecycle. - - Wraps the Pipecat pipeline with start/stop semantics. - Used by bot_bridge.py to manage pipeline per Matrix call. - """ - - def __init__(self, token: str, lk_room_name: str, cfg: Config): - self.token = token - self.lk_room_name = lk_room_name - self.cfg = cfg - self._task: asyncio.Task | None = None - - async def start(self): - """Start the voice pipeline in a background task.""" - self._task = asyncio.create_task(self._run()) - logger.info("Voice pipeline session started") - - async def stop(self): - """Stop the voice pipeline.""" - if self._task and not self._task.done(): - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - logger.info("Voice pipeline session stopped") - - async def _run(self): - try: - await run_voice_pipeline(self.token, self.lk_room_name, self.cfg) - except asyncio.CancelledError: - logger.info("Pipeline cancelled") - except Exception: - logger.exception("Pipeline failed") diff --git a/voice.py b/voice.py index 47e81ad..0785f94 100644 --- a/voice.py +++ b/voice.py @@ -15,9 +15,6 @@ from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-voice") -# Enable debug logging for agents pipeline to diagnose audio issues -logging.getLogger("livekit.agents").setLevel(logging.DEBUG) -logging.getLogger("livekit.plugins").setLevel(logging.DEBUG) LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") @@ -371,23 +368,8 @@ class VoiceSession: except asyncio.TimeoutError: logger.error("Greeting timed out") - # Periodic E2EE state poll to diagnose incoming decryption - _poll_count = 0 while True: await asyncio.sleep(5) - _poll_count += 1 - if _poll_count % 2 == 0: # Every 10s - try: - cryptors = self.lk_room.e2ee_manager.frame_cryptors() - for c in cryptors: - logger.info("E2EE_CRYPTOR: identity=%s key_index=%d enabled=%s", - c.participant_identity, c.key_index, c.enabled) - except Exception as e: - logger.debug("frame_cryptors() failed: %s", e) - # Log E2EE state for all remote participants - for p in self.lk_room.remote_participants.values(): - logger.info("REMOTE_PARTICIPANT: identity=%s tracks=%d", - p.identity, len(p.track_publications)) except asyncio.CancelledError: logger.info("Session cancelled for %s", self.room_id)