- Fix state_key format: try @user:domain:DEVICE_ID (Element Call format), then @user:domain, then scan all room state as fallback - Publish bot E2EE key to room so Element shows encrypted status - Extract caller device_id from call member event content - Also fix pipecat-poc pipeline with context aggregators (CF-1579) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
134 lines
4.1 KiB
Python
134 lines
4.1 KiB
Python
"""Configurable voice pipeline using Pipecat.
|
|
|
|
Builds a STT → LLM → TTS pipeline with context aggregators.
|
|
Providers are selected via config (STT_PROVIDER, TTS_PROVIDER env vars).
|
|
LLM uses OpenAI-compatible API via LiteLLM.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
import aiohttp
|
|
|
|
from pipecat.frames.frames import LLMMessagesAppendFrame
|
|
from pipecat.pipeline.pipeline import Pipeline
|
|
from pipecat.pipeline.runner import PipelineRunner
|
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
|
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
|
from pipecat.services.openai.llm import OpenAILLMService
|
|
from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport
|
|
|
|
from config import Config
|
|
from providers import create_stt, create_tts
|
|
|
|
logger = logging.getLogger("pipecat-poc.pipeline")
|
|
|
|
|
|
def _create_llm(cfg: Config):
|
|
"""Create LLM service using OpenAI-compatible API (LiteLLM)."""
|
|
return OpenAILLMService(
|
|
base_url=cfg.llm.base_url,
|
|
api_key=cfg.llm.api_key,
|
|
model=cfg.llm.model,
|
|
)
|
|
|
|
|
|
async def run_voice_pipeline(token: str, lk_room_name: str, cfg: Config):
|
|
"""Run the Pipecat voice pipeline.
|
|
|
|
Args:
|
|
token: LiveKit JWT used to connect
|
|
lk_room_name: LiveKit room name
|
|
cfg: Application configuration
|
|
"""
|
|
http_session = aiohttp.ClientSession()
|
|
stt = await create_stt(cfg.stt, http_session)
|
|
tts = await create_tts(cfg.tts, http_session)
|
|
llm = _create_llm(cfg)
|
|
|
|
transport = LiveKitTransport(
|
|
url=cfg.livekit.url,
|
|
token=token,
|
|
room_name=lk_room_name,
|
|
params=LiveKitParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
),
|
|
)
|
|
|
|
# Create context with system prompt and context aggregators
|
|
context = OpenAILLMContext(
|
|
messages=[{"role": "system", "content": cfg.voice.system_prompt}],
|
|
)
|
|
context_aggregator = llm.create_context_aggregator(context)
|
|
|
|
pipeline = Pipeline([
|
|
transport.input(),
|
|
stt,
|
|
context_aggregator.user(), # TranscriptionFrame → user message in context
|
|
llm,
|
|
tts,
|
|
transport.output(),
|
|
context_aggregator.assistant(), # LLM output → assistant message in context
|
|
])
|
|
|
|
task = PipelineTask(
|
|
pipeline,
|
|
params=PipelineParams(
|
|
allow_interruptions=True,
|
|
enable_metrics=True,
|
|
),
|
|
)
|
|
|
|
@transport.event_handler("on_first_participant_joined")
|
|
async def on_first_participant(transport_obj, participant):
|
|
logger.info("First participant joined: %s", participant)
|
|
await task.queue_frames([
|
|
LLMMessagesAppendFrame(
|
|
[{"role": "user", "content": cfg.voice.greeting}],
|
|
run_llm=True,
|
|
),
|
|
])
|
|
|
|
runner = PipelineRunner()
|
|
logger.info("Starting Pipecat pipeline (stt=%s, tts=%s, llm=%s)",
|
|
cfg.stt.provider, cfg.tts.provider, cfg.llm.model)
|
|
await runner.run(task)
|
|
|
|
|
|
class VoicePipelineSession:
|
|
"""Manages a single voice pipeline session lifecycle.
|
|
|
|
Wraps the Pipecat pipeline with start/stop semantics.
|
|
Used by bot_bridge.py to manage pipeline per Matrix call.
|
|
"""
|
|
|
|
def __init__(self, token: str, lk_room_name: str, cfg: Config):
|
|
self.token = token
|
|
self.lk_room_name = lk_room_name
|
|
self.cfg = cfg
|
|
self._task: asyncio.Task | None = None
|
|
|
|
async def start(self):
|
|
"""Start the voice pipeline in a background task."""
|
|
self._task = asyncio.create_task(self._run())
|
|
logger.info("Voice pipeline session started")
|
|
|
|
async def stop(self):
|
|
"""Stop the voice pipeline."""
|
|
if self._task and not self._task.done():
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("Voice pipeline session stopped")
|
|
|
|
async def _run(self):
|
|
try:
|
|
await run_voice_pipeline(self.token, self.lk_room_name, self.cfg)
|
|
except asyncio.CancelledError:
|
|
logger.info("Pipeline cancelled")
|
|
except Exception:
|
|
logger.exception("Pipeline failed")
|