"""Voice session: LiveKit + STT/LLM/TTS pipeline. E2EE via HKDF key derivation (Element Call compatible). Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks.""" import asyncio import base64 import datetime import hashlib import logging import os import aiohttp from livekit import rtc, api as lkapi from livekit.agents import Agent, AgentSession, room_io from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-voice") # Enable debug logging for agents pipeline to diagnose audio issues logging.getLogger("livekit.agents").setLevel(logging.DEBUG) logging.getLogger("livekit.plugins").setLevel(logging.DEBUG) LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "") LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "") ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "") DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf. STRIKTE Regeln: - Antworte IMMER auf Deutsch - Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze - Sei direkt und praezise, keine Fuellwoerter - Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie - Beantworte nur was gefragt wird - Wenn niemand etwas fragt, sage nur kurz Hallo""" _vad = None def _get_vad(): global _vad if _vad is None: _vad = silero.VAD.load() return _vad def _make_lk_identity(user_id, device_id): return f"{user_id}:{device_id}" def _compute_lk_room_name(room_id): raw = f"{room_id}|m.call#ROOM" return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=") def _generate_lk_jwt(room_id, user_id, device_id): identity = _make_lk_identity(user_id, device_id) lk_room = _compute_lk_room_name(room_id) token = ( lkapi.AccessToken(LK_API_KEY, LK_API_SECRET) .with_identity(identity) .with_name(user_id) .with_grants(lkapi.VideoGrants( room_join=True, room=lk_room, can_publish=True, can_subscribe=True)) .with_ttl(datetime.timedelta(hours=24)) ) logger.info("JWT: identity=%s room=%s", identity, lk_room) return token.to_jwt() KDF_HKDF = 1 def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions: """Build HKDF E2EE options matching Element Call's key derivation. Element Call uses: ratchetWindowSize=10, keyringSize=256, salt="LKFrameEncryptionKey" """ key_opts = rtc.KeyProviderOptions( shared_key=shared_key, ratchet_window_size=10, ratchet_salt=b"LKFrameEncryptionKey", failure_tolerance=10, key_ring_size=256, key_derivation_function=KDF_HKDF, ) return rtc.E2EEOptions(key_provider_options=key_opts) class VoiceSession: def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet", publish_key_cb=None, bot_key: bytes | None = None): self.nio_client = nio_client self.room_id = room_id self.device_id = device_id self.lk_url = lk_url self.model = model self.lk_room = None self.session = None self._task = None self._http_session = None self._caller_key: bytes | None = None self._caller_identity: str | None = None # "sender:device_id" format self._bot_key: bytes = bot_key or os.urandom(16) self._publish_key_cb = publish_key_cb def on_encryption_key(self, sender, device_id, key, index): """Receive E2EE key from Element Call participant. If the room is already connected, immediately set the key on the key provider so we can decrypt the caller's audio. """ if not key: return identity = _make_lk_identity(sender, device_id) self._caller_key = key self._caller_identity = identity logger.info("E2EE key received from %s:%s (identity=%s, index=%d, %d bytes)", sender, device_id, identity, index, len(key)) # If already connected, set key on the key provider immediately if self.lk_room: try: kp = self.lk_room.e2ee_manager.key_provider kp.set_key(identity, key, key_index=index) logger.info("Live-updated caller E2EE key for %s", identity) except Exception: logger.warning("Could not live-update caller E2EE key", exc_info=True) async def _fetch_encryption_key_http(self) -> bytes | None: """Fetch encryption key from room timeline (NOT state) via Matrix HTTP API. Element Call distributes encryption keys as timeline events, not state. Also sets self._caller_identity from the event sender + device_id. """ import httpx homeserver = str(self.nio_client.homeserver) token = self.nio_client.access_token url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages" try: async with httpx.AsyncClient(timeout=10.0) as http: resp = await http.get( url, headers={"Authorization": f"Bearer {token}"}, params={"dir": "b", "limit": "50"}, ) resp.raise_for_status() data = resp.json() events = data.get("chunk", []) user_id = self.nio_client.user_id for evt in events: evt_type = evt.get("type", "") if evt_type == "io.element.call.encryption_keys": sender = evt.get("sender", "") if sender == user_id: continue # skip our own key content = evt.get("content", {}) caller_device = content.get("device_id", "") logger.info("Found encryption_keys timeline event: sender=%s device=%s", sender, caller_device) if caller_device: self._caller_identity = _make_lk_identity(sender, caller_device) for k in content.get("keys", []): key_b64 = k.get("key", "") if key_b64: key_b64 += "=" * (-len(key_b64) % 4) return base64.urlsafe_b64decode(key_b64) logger.info("No encryption_keys events in last %d timeline events", len(events)) except Exception as e: logger.warning("HTTP encryption key fetch failed: %s", e) return None async def start(self): self._task = asyncio.create_task(self._run()) async def stop(self): for obj in [self.session, self.lk_room, self._http_session]: if obj: try: if hasattr(obj, "aclose"): await obj.aclose() elif hasattr(obj, "disconnect"): await obj.disconnect() elif hasattr(obj, "close"): await obj.close() except Exception: pass if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _run(self): try: user_id = self.nio_client.user_id jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id) # Check timeline for caller's encryption key caller_key = await self._fetch_encryption_key_http() if caller_key: self._caller_key = caller_key logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key)) if not self._caller_key: # Wait up to 15s for key via sync handler (bot.py forwards # encryption_keys timeline events to on_encryption_key) logger.info("No key in timeline yet, waiting for sync...") for _ in range(150): if self._caller_key: break await asyncio.sleep(0.1) # Publish bot's own key so caller can decrypt our audio if self._publish_key_cb: self._publish_key_cb(self._bot_key) # Build E2EE options with empty shared key — we set per-participant # keys after connect via e2ee_manager.key_provider.set_key() e2ee_opts = _build_e2ee_options(b"") room_opts = rtc.RoomOptions(e2ee=e2ee_opts) self.lk_room = rtc.Room() @self.lk_room.on("participant_connected") def on_p(p): logger.info("Participant connected: %s", p.identity) @self.lk_room.on("track_published") def on_tp(pub, p): logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind) @self.lk_room.on("track_subscribed") def on_ts(t, pub, p): logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind) await self.lk_room.connect(self.lk_url, jwt, options=room_opts) logger.info("Connected (E2EE=per-participant), remote=%d", len(self.lk_room.remote_participants)) # Set per-participant E2EE keys via key provider bot_identity = _make_lk_identity(user_id, self.device_id) try: kp = self.lk_room.e2ee_manager.key_provider # Set bot's own key (encrypts outgoing audio) kp.set_key(bot_identity, self._bot_key, key_index=0) logger.info("Set bot E2EE key for identity=%s (%d bytes)", bot_identity, len(self._bot_key)) # Set caller's key (decrypts incoming audio) if self._caller_key and self._caller_identity: kp.set_key(self._caller_identity, self._caller_key, key_index=0) logger.info("Set caller E2EE key for identity=%s (%d bytes)", self._caller_identity, len(self._caller_key)) elif self._caller_key: # Caller identity not yet known — try to get from remote participants for p in self.lk_room.remote_participants.values(): kp.set_key(p.identity, self._caller_key, key_index=0) logger.info("Set caller E2EE key for identity=%s (%d bytes)", p.identity, len(self._caller_key)) break else: logger.warning("No caller E2EE key available — caller audio will be silent") except AttributeError: logger.warning("e2ee_manager.key_provider not available — " "falling back to shared key mode") # Fallback: set shared key after connect if per-participant isn't supported if self._caller_key: try: kp = self.lk_room.e2ee_manager.key_provider kp.set_shared_key(self._caller_key, key_index=0) logger.info("Fallback: set shared E2EE key (%d bytes)", len(self._caller_key)) except Exception: logger.exception("Fallback shared key also failed") # Find the remote participant, wait up to 10s if not yet connected remote_identity = None for p in self.lk_room.remote_participants.values(): remote_identity = p.identity break if not remote_identity: logger.info("No remote participant yet, waiting...") for _ in range(100): await asyncio.sleep(0.1) for p in self.lk_room.remote_participants.values(): remote_identity = p.identity break if remote_identity: break if remote_identity: logger.info("Linking to remote participant: %s", remote_identity) # Voice pipeline — German male voice (Daniel) self._http_session = aiohttp.ClientSession() voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID) self.session = AgentSession( stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session), llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model), tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2", api_key=ELEVENLABS_KEY, http_session=self._http_session), vad=_get_vad(), ) # Debug: log speech events @self.session.on("user_speech_committed") def _on_user_speech(msg): logger.info("USER_SPEECH: %s", msg.text_content) @self.session.on("agent_speech_committed") def _on_agent_speech(msg): logger.info("AGENT_SPEECH: %s", msg.text_content) agent = Agent(instructions=VOICE_PROMPT) io_opts = room_io.RoomOptions( participant_identity=remote_identity, ) if remote_identity else room_io.RoomOptions() await self.session.start( agent=agent, room=self.lk_room, room_options=io_opts, ) logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity) try: await asyncio.wait_for( self.session.generate_reply( instructions="Sage nur: Hallo, wie kann ich helfen?"), timeout=30.0) logger.info("Greeting sent") except asyncio.TimeoutError: logger.error("Greeting timed out") while True: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Session cancelled for %s", self.room_id) except Exception: logger.exception("Session failed for %s", self.room_id)