From 80582860b9f5c6e9074f4609c94b9e088d8f548c Mon Sep 17 00:00:00 2001 From: Christian Gick Date: Sat, 21 Feb 2026 14:51:26 +0200 Subject: [PATCH] fix: E2EE key lookup for Element Call voice sessions - Fix state_key format: try @user:domain:DEVICE_ID (Element Call format), then @user:domain, then scan all room state as fallback - Publish bot E2EE key to room so Element shows encrypted status - Extract caller device_id from call member event content - Also fix pipecat-poc pipeline with context aggregators (CF-1579) Co-Authored-By: Claude Opus 4.6 --- bot.py | 81 ++++++++++++++++++------ pipecat-poc/pipeline.py | 133 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 196 insertions(+), 18 deletions(-) create mode 100644 pipecat-poc/pipeline.py diff --git a/bot.py b/bot.py index 876807d..fb1852b 100644 --- a/bot.py +++ b/bot.py @@ -424,14 +424,19 @@ class Bot: model=model, ) # Read existing encryption keys from room state before starting - caller_key = await self._get_call_encryption_key(room_id, event.sender) + caller_device_id = content.get("device_id", "") + caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id) if caller_key: - vs.on_encryption_key(event.sender, "", caller_key, 0) + vs.on_encryption_key(event.sender, caller_device_id, caller_key, 0) await vs.start() self.voice_sessions[room_id] = vs logger.info("Voice session started for room %s (e2ee_key=%s)", room_id, "yes" if caller_key else "no") + + # Publish our E2EE key so Element Call sees us as encrypted + if caller_key: + await self._publish_encryption_key(room_id, caller_key) except Exception: logger.exception("Voice session start failed for %s", room_id) @@ -1430,24 +1435,64 @@ class Bot: }, ) - async def _get_call_encryption_key(self, room_id: str, sender: str) -> bytes | None: - """Read E2EE encryption key from io.element.call.encryption_keys state events.""" + async def _get_call_encryption_key(self, room_id: str, sender: str, caller_device_id: str = "") -> bytes | None: + """Read E2EE encryption key from io.element.call.encryption_keys state events. + + Element Call uses state_key format: @user:domain:DEVICE_ID + Falls back to trying just @user:domain and scanning all room state. + """ + # Try state_key formats: @user:domain:device_id, then @user:domain + state_keys_to_try = [] + if caller_device_id: + state_keys_to_try.append(f"{sender}:{caller_device_id}") + state_keys_to_try.append(sender) + + for state_key in state_keys_to_try: + try: + resp = await self.client.room_get_state_event( + room_id, ENCRYPTION_KEYS_TYPE, state_key, + ) + key = self._extract_e2ee_key(resp, sender, state_key) + if key: + return key + except Exception as e: + logger.debug("No encryption key at state_key=%s: %s", state_key, e) + + # Fallback: scan all room state for any encryption_keys event try: - resp = await self.client.room_get_state_event( - room_id, ENCRYPTION_KEYS_TYPE, sender, - ) - if hasattr(resp, "content") and resp.content: - keys = resp.content.get("keys", []) - if keys: - key_b64 = keys[0].get("key", "") - if key_b64: - # Element Call uses base64url encoding - key_b64 += "=" * (-len(key_b64) % 4) # pad - key = base64.urlsafe_b64decode(key_b64) - logger.info("Got E2EE key from %s (%d bytes)", sender, len(key)) - return key + resp = await self.client.room_get_state(room_id) + if hasattr(resp, "events"): + for evt in resp.events: + if evt.get("type") == ENCRYPTION_KEYS_TYPE and evt.get("sender") != BOT_USER: + content = evt.get("content", {}) + keys = content.get("keys", []) + for k in keys: + key_b64 = k.get("key", "") + if key_b64: + key_b64 += "=" * (-len(key_b64) % 4) + key = base64.urlsafe_b64decode(key_b64) + logger.info("Got E2EE key from room state scan (%s, %d bytes)", + evt.get("state_key", "?"), len(key)) + return key except Exception as e: - logger.debug("No encryption key from %s in %s: %s", sender, room_id, e) + logger.debug("Room state scan for encryption keys failed: %s", e) + + logger.warning("No E2EE encryption key found for %s in %s", sender, room_id) + return None + + @staticmethod + def _extract_e2ee_key(resp, sender: str, state_key: str) -> bytes | None: + """Extract E2EE key bytes from a state event response.""" + if not hasattr(resp, "content") or not resp.content: + return None + keys = resp.content.get("keys", []) + for k in keys: + key_b64 = k.get("key", "") + if key_b64: + key_b64 += "=" * (-len(key_b64) % 4) + key = base64.urlsafe_b64decode(key_b64) + logger.info("Got E2EE key from %s (state_key=%s, %d bytes)", sender, state_key, len(key)) + return key return None async def _publish_encryption_key(self, room_id: str, key: bytes): diff --git a/pipecat-poc/pipeline.py b/pipecat-poc/pipeline.py new file mode 100644 index 0000000..1fa9b5d --- /dev/null +++ b/pipecat-poc/pipeline.py @@ -0,0 +1,133 @@ +"""Configurable voice pipeline using Pipecat. + +Builds a STT → LLM → TTS pipeline with context aggregators. +Providers are selected via config (STT_PROVIDER, TTS_PROVIDER env vars). +LLM uses OpenAI-compatible API via LiteLLM. +""" + +import asyncio +import logging + +import aiohttp + +from pipecat.frames.frames import LLMMessagesAppendFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport + +from config import Config +from providers import create_stt, create_tts + +logger = logging.getLogger("pipecat-poc.pipeline") + + +def _create_llm(cfg: Config): + """Create LLM service using OpenAI-compatible API (LiteLLM).""" + return OpenAILLMService( + base_url=cfg.llm.base_url, + api_key=cfg.llm.api_key, + model=cfg.llm.model, + ) + + +async def run_voice_pipeline(token: str, lk_room_name: str, cfg: Config): + """Run the Pipecat voice pipeline. + + Args: + token: LiveKit JWT used to connect + lk_room_name: LiveKit room name + cfg: Application configuration + """ + http_session = aiohttp.ClientSession() + stt = await create_stt(cfg.stt, http_session) + tts = await create_tts(cfg.tts, http_session) + llm = _create_llm(cfg) + + transport = LiveKitTransport( + url=cfg.livekit.url, + token=token, + room_name=lk_room_name, + params=LiveKitParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + ) + + # Create context with system prompt and context aggregators + context = OpenAILLMContext( + messages=[{"role": "system", "content": cfg.voice.system_prompt}], + ) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline([ + transport.input(), + stt, + context_aggregator.user(), # TranscriptionFrame → user message in context + llm, + tts, + transport.output(), + context_aggregator.assistant(), # LLM output → assistant message in context + ]) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant(transport_obj, participant): + logger.info("First participant joined: %s", participant) + await task.queue_frames([ + LLMMessagesAppendFrame( + [{"role": "user", "content": cfg.voice.greeting}], + run_llm=True, + ), + ]) + + runner = PipelineRunner() + logger.info("Starting Pipecat pipeline (stt=%s, tts=%s, llm=%s)", + cfg.stt.provider, cfg.tts.provider, cfg.llm.model) + await runner.run(task) + + +class VoicePipelineSession: + """Manages a single voice pipeline session lifecycle. + + Wraps the Pipecat pipeline with start/stop semantics. + Used by bot_bridge.py to manage pipeline per Matrix call. + """ + + def __init__(self, token: str, lk_room_name: str, cfg: Config): + self.token = token + self.lk_room_name = lk_room_name + self.cfg = cfg + self._task: asyncio.Task | None = None + + async def start(self): + """Start the voice pipeline in a background task.""" + self._task = asyncio.create_task(self._run()) + logger.info("Voice pipeline session started") + + async def stop(self): + """Stop the voice pipeline.""" + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("Voice pipeline session stopped") + + async def _run(self): + try: + await run_voice_pipeline(self.token, self.lk_room_name, self.cfg) + except asyncio.CancelledError: + logger.info("Pipeline cancelled") + except Exception: + logger.exception("Pipeline failed")