"""Voice session: LiveKit + STT/LLM/TTS pipeline. E2EE via HKDF key derivation (Element Call compatible). Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks.""" import asyncio import base64 import datetime import hashlib import logging import os import aiohttp from livekit import rtc, api as lkapi from livekit.agents import Agent, AgentSession, room_io from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-voice") # Enable debug logging for agents pipeline to diagnose audio issues logging.getLogger("livekit.agents").setLevel(logging.DEBUG) logging.getLogger("livekit.plugins").setLevel(logging.DEBUG) LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "") LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "") ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "") DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf. STRIKTE Regeln: - Antworte IMMER auf Deutsch - Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze - Sei direkt und praezise, keine Fuellwoerter - Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie - Beantworte nur was gefragt wird - Wenn niemand etwas fragt, sage nur kurz Hallo""" _vad = None def _get_vad(): global _vad if _vad is None: _vad = silero.VAD.load() return _vad def _make_lk_identity(user_id, device_id): return f"{user_id}:{device_id}" def _compute_lk_room_name(room_id): raw = f"{room_id}|m.call#ROOM" return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=") def _generate_lk_jwt(room_id, user_id, device_id): identity = _make_lk_identity(user_id, device_id) lk_room = _compute_lk_room_name(room_id) token = ( lkapi.AccessToken(LK_API_KEY, LK_API_SECRET) .with_identity(identity) .with_name(user_id) .with_grants(lkapi.VideoGrants( room_join=True, room=lk_room, can_publish=True, can_subscribe=True)) .with_ttl(datetime.timedelta(hours=24)) ) logger.info("JWT: identity=%s room=%s", identity, lk_room) return token.to_jwt() KDF_HKDF = 1 def _build_e2ee_options(caller_key: bytes = b"") -> rtc.E2EEOptions: """Build HKDF E2EE options matching Element Call's key derivation. Pass caller_key as shared_key to initialize in true shared-key mode. This ensures the Rust FFI decrypts incoming frames using caller's key. Outgoing encryption is overridden via set_key(bot_identity, bot_key) after connect. Element Call uses: ratchetWindowSize=16, keyringSize=256, salt="LKFrameEncryptionKey" """ key_opts = rtc.KeyProviderOptions( shared_key=caller_key, ratchet_window_size=16, ratchet_salt=b"LKFrameEncryptionKey", failure_tolerance=-1, key_ring_size=256, key_derivation_function=KDF_HKDF, ) return rtc.E2EEOptions(key_provider_options=key_opts) class VoiceSession: def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet", publish_key_cb=None, bot_key: bytes | None = None): self.nio_client = nio_client self.room_id = room_id self.device_id = device_id self.lk_url = lk_url self.model = model self.lk_room = None self.session = None self._task = None self._http_session = None self._caller_key: bytes | None = None self._caller_identity: str | None = None self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index self._bot_key: bytes = bot_key or os.urandom(16) self._publish_key_cb = publish_key_cb def on_encryption_key(self, sender, device_id, key, index): """Receive E2EE key from Element Call participant.""" if not key: return if not self._caller_key: self._caller_key = key self._caller_identity = f"{sender}:{device_id}" self._caller_all_keys[index] = key logger.info("E2EE key received from %s:%s (index=%d, %d bytes)", sender, device_id, index, len(key)) # Live-update shared key when caller rotates (e.g. on bot join) — use # set_shared_key so the shared-key decryption path stays in sync. if self.lk_room and hasattr(self.lk_room, 'e2ee_manager'): try: kp = self.lk_room.e2ee_manager.key_provider kp.set_shared_key(key, index) logger.info("Live-updated shared key[%d] (%d bytes)", index, len(key)) except Exception as e: logger.warning("Failed to live-update shared key: %s", e) async def _fetch_encryption_key_http(self) -> bytes | None: """Fetch encryption key from room timeline (NOT state) via Matrix HTTP API. Element Call distributes encryption keys as timeline events, not state. """ import httpx homeserver = str(self.nio_client.homeserver) token = self.nio_client.access_token url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages" try: async with httpx.AsyncClient(timeout=10.0) as http: resp = await http.get( url, headers={"Authorization": f"Bearer {token}"}, params={"dir": "b", "limit": "50"}, ) resp.raise_for_status() data = resp.json() events = data.get("chunk", []) user_id = self.nio_client.user_id for evt in events: evt_type = evt.get("type", "") if evt_type == "io.element.call.encryption_keys": sender = evt.get("sender", "") if sender == user_id: continue # skip our own key content = evt.get("content", {}) device = content.get("device_id", "") logger.info("Found encryption_keys timeline event: sender=%s device=%s", sender, device) all_keys = {} import base64 as b64 for k in content.get("keys", []): key_b64 = k.get("key", "") key_index = k.get("index", 0) if key_b64: key_b64 += "=" * (-len(key_b64) % 4) key_bytes = b64.urlsafe_b64decode(key_b64) all_keys[key_index] = key_bytes if all_keys: if device: self._caller_identity = f"{sender}:{device}" self._caller_all_keys.update(all_keys) max_idx = max(all_keys.keys()) logger.info("Loaded caller keys at indices %s (using %d)", sorted(all_keys.keys()), max_idx) return all_keys[max_idx] logger.info("No encryption_keys events in last %d timeline events", len(events)) except Exception as e: logger.warning("HTTP encryption key fetch failed: %s", e) return None async def start(self): self._task = asyncio.create_task(self._run()) async def stop(self): for obj in [self.session, self.lk_room, self._http_session]: if obj: try: if hasattr(obj, "aclose"): await obj.aclose() elif hasattr(obj, "disconnect"): await obj.disconnect() elif hasattr(obj, "close"): await obj.close() except Exception: pass if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _run(self): try: user_id = self.nio_client.user_id bot_identity = _make_lk_identity(user_id, self.device_id) jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id) # Publish bot's own key immediately so Element Call can decrypt us if self._publish_key_cb: self._publish_key_cb(self._bot_key) logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key)) # Check timeline for caller's encryption key caller_key = await self._fetch_encryption_key_http() if caller_key: self._caller_key = caller_key logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key)) if not self._caller_key: # Wait up to 15s for key via sync handler logger.info("No key in timeline yet, waiting for sync...") for _ in range(150): if self._caller_key: break await asyncio.sleep(0.1) # Connect with caller_key as shared_key so Rust FFI decrypts # incoming audio in true shared-key mode. Outgoing encryption # is overridden to bot_key via set_key(bot_identity) after connect. e2ee_opts = _build_e2ee_options(self._caller_key or b"") room_opts = rtc.RoomOptions(e2ee=e2ee_opts) self.lk_room = rtc.Room() @self.lk_room.on("participant_connected") def on_p(p): logger.info("Participant connected: %s", p.identity) @self.lk_room.on("track_published") def on_tp(pub, p): logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind) @self.lk_room.on("track_subscribed") def on_ts(t, pub, p): logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind) @self.lk_room.on("e2ee_state_changed") def on_e2ee_state(participant, state): logger.info("E2EE_STATE: participant=%s state=%s(%d)", participant.identity if participant else "local", state, int(state)) await self.lk_room.connect(self.lk_url, jwt, options=room_opts) logger.info("Connected (E2EE=HKDF), remote=%d", len(self.lk_room.remote_participants)) # Set per-participant keys via key provider kp = self.lk_room.e2ee_manager.key_provider # Bot's own key — encrypts outgoing audio kp.set_key(bot_identity, self._bot_key, 0) logger.info("Set bot key for %s (%d bytes)", bot_identity, len(self._bot_key)) # Find the remote participant, wait up to 10s if not yet connected remote_identity = None for p in self.lk_room.remote_participants.values(): remote_identity = p.identity break if not remote_identity: logger.info("No remote participant yet, waiting...") for _ in range(100): await asyncio.sleep(0.1) for p in self.lk_room.remote_participants.values(): remote_identity = p.identity break if remote_identity: break # Caller key was passed as shared_key at connect time — verify it's stored. if self._caller_key: logger.info("Caller key active as shared_key (%d bytes, index 0)", len(self._caller_key)) try: stored = kp.export_shared_key(0) if stored == self._caller_key: logger.info("VERIFIED: shared key[0] matches caller key (%d bytes)", len(stored)) else: logger.warning("MISMATCH: stored shared key[0] (%d bytes) != caller key (%d bytes)", len(stored), len(self._caller_key)) logger.warning("stored=%s", stored.hex()) logger.warning("caller=%s", self._caller_key.hex()) except Exception as e: logger.warning("Could not export shared key: %s", e) else: logger.warning("No caller E2EE key — incoming audio will be silence") # Also set caller key as per-participant key (belt+suspenders: both modes) if self._caller_key and remote_identity: try: kp.set_key(remote_identity, self._caller_key, 0) logger.info("Also set per-participant key for %s (%d bytes)", remote_identity, len(self._caller_key)) except Exception as e: logger.warning("Failed to set per-participant caller key: %s", e) if remote_identity: logger.info("Linking to remote participant: %s", remote_identity) # Voice pipeline — German male voice (Daniel) self._http_session = aiohttp.ClientSession() voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID) self.session = AgentSession( stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session), llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model), tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2", api_key=ELEVENLABS_KEY, http_session=self._http_session), vad=_get_vad(), ) # Debug: log speech events @self.session.on("user_speech_committed") def _on_user_speech(msg): logger.info("USER_SPEECH: %s", msg.text_content) @self.session.on("agent_speech_committed") def _on_agent_speech(msg): logger.info("AGENT_SPEECH: %s", msg.text_content) agent = Agent(instructions=VOICE_PROMPT) io_opts = room_io.RoomOptions( participant_identity=remote_identity, close_on_disconnect=False, ) if remote_identity else room_io.RoomOptions(close_on_disconnect=False) await self.session.start( agent=agent, room=self.lk_room, room_options=io_opts, ) logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity) try: await asyncio.wait_for( self.session.generate_reply( instructions="Sage nur: Hallo, wie kann ich helfen?"), timeout=30.0) logger.info("Greeting sent") except asyncio.TimeoutError: logger.error("Greeting timed out") while True: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Session cancelled for %s", self.room_id) except Exception: logger.exception("Session failed for %s", self.room_id)