"""Configurable voice pipeline using Pipecat. Builds a STT → LLM → TTS pipeline with context aggregators. Providers are selected via config (STT_PROVIDER, TTS_PROVIDER env vars). LLM uses OpenAI-compatible API via LiteLLM. """ import asyncio import logging import aiohttp from pipecat.frames.frames import LLMMessagesAppendFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport from config import Config from providers import create_stt, create_tts logger = logging.getLogger("pipecat-poc.pipeline") def _create_llm(cfg: Config): """Create LLM service using OpenAI-compatible API (LiteLLM).""" return OpenAILLMService( base_url=cfg.llm.base_url, api_key=cfg.llm.api_key, model=cfg.llm.model, ) async def run_voice_pipeline(token: str, lk_room_name: str, cfg: Config): """Run the Pipecat voice pipeline. Args: token: LiveKit JWT used to connect lk_room_name: LiveKit room name cfg: Application configuration """ http_session = aiohttp.ClientSession() stt = await create_stt(cfg.stt, http_session) tts = await create_tts(cfg.tts, http_session) llm = _create_llm(cfg) transport = LiveKitTransport( url=cfg.livekit.url, token=token, room_name=lk_room_name, params=LiveKitParams( audio_in_enabled=True, audio_out_enabled=True, ), ) # Create context with system prompt and context aggregators context = OpenAILLMContext( messages=[{"role": "system", "content": cfg.voice.system_prompt}], ) context_aggregator = llm.create_context_aggregator(context) pipeline = Pipeline([ transport.input(), stt, context_aggregator.user(), # TranscriptionFrame → user message in context llm, tts, transport.output(), context_aggregator.assistant(), # LLM output → assistant message in context ]) task = PipelineTask( pipeline, params=PipelineParams( allow_interruptions=True, enable_metrics=True, ), ) @transport.event_handler("on_first_participant_joined") async def on_first_participant(transport_obj, participant): logger.info("First participant joined: %s", participant) await task.queue_frames([ LLMMessagesAppendFrame( [{"role": "user", "content": cfg.voice.greeting}], run_llm=True, ), ]) runner = PipelineRunner() logger.info("Starting Pipecat pipeline (stt=%s, tts=%s, llm=%s)", cfg.stt.provider, cfg.tts.provider, cfg.llm.model) await runner.run(task) class VoicePipelineSession: """Manages a single voice pipeline session lifecycle. Wraps the Pipecat pipeline with start/stop semantics. Used by bot_bridge.py to manage pipeline per Matrix call. """ def __init__(self, token: str, lk_room_name: str, cfg: Config): self.token = token self.lk_room_name = lk_room_name self.cfg = cfg self._task: asyncio.Task | None = None async def start(self): """Start the voice pipeline in a background task.""" self._task = asyncio.create_task(self._run()) logger.info("Voice pipeline session started") async def stop(self): """Stop the voice pipeline.""" if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("Voice pipeline session stopped") async def _run(self): try: await run_voice_pipeline(self.token, self.lk_room_name, self.cfg) except asyncio.CancelledError: logger.info("Pipeline cancelled") except Exception: logger.exception("Pipeline failed")