"""Voice session: LiveKit + STT/LLM/TTS pipeline. E2EE via HKDF key derivation (Element Call compatible). Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks.""" import asyncio import base64 import datetime import hashlib import logging import os import aiohttp from livekit import rtc, api as lkapi from livekit.agents import Agent, AgentSession from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-voice") LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "") LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "") ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "") DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf. STRIKTE Regeln: - Antworte IMMER auf Deutsch - Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze - Sei direkt und praezise, keine Fuellwoerter - Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie - Beantworte nur was gefragt wird - Wenn niemand etwas fragt, sage nur kurz Hallo""" _vad = None def _get_vad(): global _vad if _vad is None: _vad = silero.VAD.load() return _vad def _make_lk_identity(user_id, device_id): return f"{user_id}:{device_id}" def _compute_lk_room_name(room_id): raw = f"{room_id}|m.call#ROOM" return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=") def _generate_lk_jwt(room_id, user_id, device_id): identity = _make_lk_identity(user_id, device_id) lk_room = _compute_lk_room_name(room_id) token = ( lkapi.AccessToken(LK_API_KEY, LK_API_SECRET) .with_identity(identity) .with_name(user_id) .with_grants(lkapi.VideoGrants( room_join=True, room=lk_room, can_publish=True, can_subscribe=True)) .with_ttl(datetime.timedelta(hours=24)) ) logger.info("JWT: identity=%s room=%s", identity, lk_room) return token.to_jwt() KDF_HKDF = 1 def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions: """Build HKDF E2EE options matching Element Call's key derivation.""" key_opts = rtc.KeyProviderOptions( shared_key=shared_key, ratchet_window_size=0, ratchet_salt=b"LKFrameEncryptionKey", failure_tolerance=-1, key_ring_size=16, key_derivation_function=KDF_HKDF, ) return rtc.E2EEOptions(key_provider_options=key_opts) class VoiceSession: def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet"): self.nio_client = nio_client self.room_id = room_id self.device_id = device_id self.lk_url = lk_url self.model = model self.lk_room = None self.session = None self._task = None self._http_session = None self._e2ee_key: bytes | None = None def on_encryption_key(self, sender, device_id, key, index): """Receive E2EE key from Element Call participant.""" if key and not self._e2ee_key: self._e2ee_key = key logger.info("E2EE key received from %s:%s (index=%d, %d bytes)", sender, device_id, index, len(key)) async def start(self): self._task = asyncio.create_task(self._run()) async def stop(self): for obj in [self.session, self.lk_room, self._http_session]: if obj: try: if hasattr(obj, "aclose"): await obj.aclose() elif hasattr(obj, "disconnect"): await obj.disconnect() elif hasattr(obj, "close"): await obj.close() except Exception: pass if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _run(self): try: user_id = self.nio_client.user_id jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id) # Wait up to 10s for E2EE encryption key from Element Call for _ in range(100): if self._e2ee_key: break await asyncio.sleep(0.1) if not self._e2ee_key: logger.warning("No E2EE key received after 10s, connecting without encryption") # Connect with E2EE if key available e2ee_opts = None if self._e2ee_key: e2ee_opts = _build_e2ee_options(self._e2ee_key) logger.info("E2EE enabled with HKDF (%d byte key)", len(self._e2ee_key)) room_opts = rtc.RoomOptions(e2ee=e2ee_opts) self.lk_room = rtc.Room() @self.lk_room.on("participant_connected") def on_p(p): logger.info("Participant connected: %s", p.identity) @self.lk_room.on("track_published") def on_tp(pub, p): logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind) @self.lk_room.on("track_subscribed") def on_ts(t, pub, p): logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind) await self.lk_room.connect(self.lk_url, jwt, options=room_opts) logger.info("Connected (E2EE=%s), remote=%d", "HKDF" if self._e2ee_key else "off", len(self.lk_room.remote_participants)) # Voice pipeline — German male voice (Daniel) self._http_session = aiohttp.ClientSession() voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID) self.session = AgentSession( stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session), llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model), tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2", api_key=ELEVENLABS_KEY, http_session=self._http_session), vad=_get_vad(), ) agent = Agent(instructions=VOICE_PROMPT) await self.session.start(agent=agent, room=self.lk_room) logger.info("Voice pipeline started (voice=%s)", voice_id) try: await asyncio.wait_for( self.session.generate_reply( instructions="Begruesse den Nutzer kurz auf Deutsch."), timeout=30.0) logger.info("Greeting sent") except asyncio.TimeoutError: logger.error("Greeting timed out") while True: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Session cancelled for %s", self.room_id) except Exception: logger.exception("Session failed for %s", self.room_id)