"""Voice session: LiveKit + STT/LLM/TTS pipeline. E2EE via HKDF key derivation (Element Call compatible). Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks.""" import asyncio import base64 import datetime import hashlib import logging import os import aiohttp from livekit import rtc, api as lkapi from livekit.agents import Agent, AgentSession, room_io from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-voice") # Enable debug logging for agents pipeline to diagnose audio issues logging.getLogger("livekit.agents").setLevel(logging.DEBUG) logging.getLogger("livekit.plugins").setLevel(logging.DEBUG) LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "") LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "") ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "") DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf. STRIKTE Regeln: - Antworte IMMER auf Deutsch - Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze - Sei direkt und praezise, keine Fuellwoerter - Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie - Beantworte nur was gefragt wird - Wenn niemand etwas fragt, sage nur kurz Hallo""" _vad = None def _get_vad(): global _vad if _vad is None: _vad = silero.VAD.load() return _vad def _make_lk_identity(user_id, device_id): return f"{user_id}:{device_id}" def _compute_lk_room_name(room_id): raw = f"{room_id}|m.call#ROOM" return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=") def _generate_lk_jwt(room_id, user_id, device_id): identity = _make_lk_identity(user_id, device_id) lk_room = _compute_lk_room_name(room_id) token = ( lkapi.AccessToken(LK_API_KEY, LK_API_SECRET) .with_identity(identity) .with_name(user_id) .with_grants(lkapi.VideoGrants( room_join=True, room=lk_room, can_publish=True, can_subscribe=True)) .with_ttl(datetime.timedelta(hours=24)) ) logger.info("JWT: identity=%s room=%s", identity, lk_room) return token.to_jwt() KDF_HKDF = 1 def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions: """Build HKDF E2EE options matching Element Call's key derivation.""" key_opts = rtc.KeyProviderOptions( shared_key=shared_key, ratchet_window_size=0, ratchet_salt=b"LKFrameEncryptionKey", failure_tolerance=-1, key_ring_size=16, key_derivation_function=KDF_HKDF, ) return rtc.E2EEOptions(key_provider_options=key_opts) class VoiceSession: def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet"): self.nio_client = nio_client self.room_id = room_id self.device_id = device_id self.lk_url = lk_url self.model = model self.lk_room = None self.session = None self._task = None self._http_session = None self._e2ee_key: bytes | None = None def on_encryption_key(self, sender, device_id, key, index): """Receive E2EE key from Element Call participant.""" if key and not self._e2ee_key: self._e2ee_key = key logger.info("E2EE key received from %s:%s (index=%d, %d bytes)", sender, device_id, index, len(key)) async def _publish_e2ee_key(self, key: bytes): """Publish our E2EE key to room state so Element Call shares its key with us.""" import base64 as b64 key_b64 = b64.urlsafe_b64encode(key).decode().rstrip("=") content = { "call_id": "", "device_id": self.device_id, "keys": [{"index": 0, "key": key_b64}], } user_id = self.nio_client.user_id state_key = f"{user_id}:{self.device_id}" try: ENCRYPTION_KEYS_TYPE = "io.element.call.encryption_keys" await self.nio_client.room_put_state( self.room_id, ENCRYPTION_KEYS_TYPE, content, state_key=state_key, ) logger.info("Published E2EE key (state_key=%s)", state_key) except Exception: logger.exception("Failed to publish E2EE key") async def start(self): self._task = asyncio.create_task(self._run()) async def stop(self): for obj in [self.session, self.lk_room, self._http_session]: if obj: try: if hasattr(obj, "aclose"): await obj.aclose() elif hasattr(obj, "disconnect"): await obj.disconnect() elif hasattr(obj, "close"): await obj.close() except Exception: pass if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _run(self): try: user_id = self.nio_client.user_id jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id) # Generate our own E2EE key and publish it to the room # Element Call requires ALL participants to publish keys import secrets our_key = secrets.token_bytes(32) await self._publish_e2ee_key(our_key) logger.info("Published our E2EE key (%d bytes)", len(our_key)) # Wait up to 10s for caller's E2EE encryption key for _ in range(100): if self._e2ee_key: break await asyncio.sleep(0.1) # Use caller's key if available, otherwise use our own shared_key = self._e2ee_key or our_key if self._e2ee_key: logger.info("Using caller's E2EE key (%d bytes)", len(self._e2ee_key)) else: logger.warning("No caller key received after 10s, using our own key") e2ee_opts = _build_e2ee_options(shared_key) logger.info("E2EE enabled with HKDF") room_opts = rtc.RoomOptions(e2ee=e2ee_opts) self.lk_room = rtc.Room() @self.lk_room.on("participant_connected") def on_p(p): logger.info("Participant connected: %s", p.identity) @self.lk_room.on("track_published") def on_tp(pub, p): logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind) @self.lk_room.on("track_subscribed") def on_ts(t, pub, p): logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind) await self.lk_room.connect(self.lk_url, jwt, options=room_opts) logger.info("Connected (E2EE=HKDF), remote=%d", len(self.lk_room.remote_participants)) # Find the remote participant to link to remote_identity = None for p in self.lk_room.remote_participants.values(): remote_identity = p.identity logger.info("Linking to remote participant: %s", remote_identity) break # Voice pipeline — German male voice (Daniel) self._http_session = aiohttp.ClientSession() voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID) self.session = AgentSession( stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session), llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model), tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2", api_key=ELEVENLABS_KEY, http_session=self._http_session), vad=_get_vad(), ) # Debug: log speech events @self.session.on("user_speech_committed") def _on_user_speech(msg): logger.info("USER_SPEECH: %s", msg.text_content) @self.session.on("agent_speech_committed") def _on_agent_speech(msg): logger.info("AGENT_SPEECH: %s", msg.text_content) agent = Agent(instructions=VOICE_PROMPT) room_opts = room_io.RoomOptions( participant_identity=remote_identity, ) if remote_identity else None await self.session.start( agent=agent, room=self.lk_room, room_options=room_opts, ) logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity) try: await asyncio.wait_for( self.session.generate_reply( instructions="Sage nur: Hallo, wie kann ich helfen?"), timeout=30.0) logger.info("Greeting sent") except asyncio.TimeoutError: logger.error("Greeting timed out") while True: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Session cancelled for %s", self.room_id) except Exception: logger.exception("Session failed for %s", self.room_id)