"""Voice session: LiveKit + STT/LLM/TTS pipeline. E2EE via HKDF key derivation (Element Call compatible). Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks.""" import asyncio import base64 import datetime import hashlib import logging import os import aiohttp from livekit import rtc, api as lkapi from livekit.agents import Agent, AgentSession, room_io from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-voice") # Enable debug logging for agents pipeline to diagnose audio issues logging.getLogger("livekit.agents").setLevel(logging.DEBUG) logging.getLogger("livekit.plugins").setLevel(logging.DEBUG) LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "") LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "") ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "") DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf. STRIKTE Regeln: - Antworte IMMER auf Deutsch - Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze - Sei direkt und praezise, keine Fuellwoerter - Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie - Beantworte nur was gefragt wird - Wenn niemand etwas fragt, sage nur kurz Hallo""" _vad = None def _get_vad(): global _vad if _vad is None: _vad = silero.VAD.load() return _vad def _make_lk_identity(user_id, device_id): return f"{user_id}:{device_id}" def _compute_lk_room_name(room_id): raw = f"{room_id}|m.call#ROOM" return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=") def _generate_lk_jwt(room_id, user_id, device_id): identity = _make_lk_identity(user_id, device_id) lk_room = _compute_lk_room_name(room_id) token = ( lkapi.AccessToken(LK_API_KEY, LK_API_SECRET) .with_identity(identity) .with_name(user_id) .with_grants(lkapi.VideoGrants( room_join=True, room=lk_room, can_publish=True, can_subscribe=True)) .with_ttl(datetime.timedelta(hours=24)) ) logger.info("JWT: identity=%s room=%s", identity, lk_room) return token.to_jwt() KDF_HKDF = 1 def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions: """Build HKDF E2EE options matching Element Call's key derivation.""" key_opts = rtc.KeyProviderOptions( shared_key=shared_key, ratchet_window_size=0, ratchet_salt=b"LKFrameEncryptionKey", failure_tolerance=-1, key_ring_size=16, key_derivation_function=KDF_HKDF, ) return rtc.E2EEOptions(key_provider_options=key_opts) class VoiceSession: def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet"): self.nio_client = nio_client self.room_id = room_id self.device_id = device_id self.lk_url = lk_url self.model = model self.lk_room = None self.session = None self._task = None self._http_session = None self._e2ee_key: bytes | None = None def on_encryption_key(self, sender, device_id, key, index): """Receive E2EE key from Element Call participant.""" if key and not self._e2ee_key: self._e2ee_key = key logger.info("E2EE key received from %s:%s (index=%d, %d bytes)", sender, device_id, index, len(key)) async def start(self): self._task = asyncio.create_task(self._run()) async def stop(self): for obj in [self.session, self.lk_room, self._http_session]: if obj: try: if hasattr(obj, "aclose"): await obj.aclose() elif hasattr(obj, "disconnect"): await obj.disconnect() elif hasattr(obj, "close"): await obj.close() except Exception: pass if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _run(self): try: user_id = self.nio_client.user_id jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id) # Wait up to 10s for E2EE encryption key from Element Call for _ in range(100): if self._e2ee_key: break await asyncio.sleep(0.1) if not self._e2ee_key: logger.warning("No E2EE key received after 10s, connecting without encryption") # Connect with E2EE if key available e2ee_opts = None if self._e2ee_key: e2ee_opts = _build_e2ee_options(self._e2ee_key) logger.info("E2EE enabled with HKDF (%d byte key)", len(self._e2ee_key)) room_opts = rtc.RoomOptions(e2ee=e2ee_opts) self.lk_room = rtc.Room() @self.lk_room.on("participant_connected") def on_p(p): logger.info("Participant connected: %s", p.identity) @self.lk_room.on("track_published") def on_tp(pub, p): logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind) @self.lk_room.on("track_subscribed") def on_ts(t, pub, p): logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind) await self.lk_room.connect(self.lk_url, jwt, options=room_opts) logger.info("Connected (E2EE=%s), remote=%d", "HKDF" if self._e2ee_key else "off", len(self.lk_room.remote_participants)) # Find the remote participant to link to remote_identity = None for p in self.lk_room.remote_participants.values(): remote_identity = p.identity logger.info("Linking to remote participant: %s", remote_identity) break # Voice pipeline — German male voice (Daniel) self._http_session = aiohttp.ClientSession() voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID) self.session = AgentSession( stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session), llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model), tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2", api_key=ELEVENLABS_KEY, http_session=self._http_session), vad=_get_vad(), ) # Debug: log speech events @self.session.on("user_speech_committed") def _on_user_speech(msg): logger.info("USER_SPEECH: %s", msg.text_content) @self.session.on("agent_speech_committed") def _on_agent_speech(msg): logger.info("AGENT_SPEECH: %s", msg.text_content) agent = Agent(instructions=VOICE_PROMPT) room_opts = room_io.RoomOptions( participant_identity=remote_identity, ) if remote_identity else None await self.session.start( agent=agent, room=self.lk_room, room_options=room_opts, ) logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity) try: await asyncio.wait_for( self.session.generate_reply( instructions="Sage nur: Hallo, wie kann ich helfen?"), timeout=30.0) logger.info("Greeting sent") except asyncio.TimeoutError: logger.error("Greeting timed out") while True: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Session cancelled for %s", self.room_id) except Exception: logger.exception("Session failed for %s", self.room_id)