diff --git a/docker-compose.yml b/docker-compose.yml index 8ffae8f..923e7f6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: bot: build: context: . - dockerfile: Dockerfile.bot + dockerfile: Dockerfile command: python bot.py env_file: .env restart: unless-stopped diff --git a/voice.py b/voice.py new file mode 100644 index 0000000..e0c78e1 --- /dev/null +++ b/voice.py @@ -0,0 +1,187 @@ +"""Voice session: LiveKit + STT/LLM/TTS pipeline. +E2EE via HKDF key derivation (Element Call compatible). +Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks.""" +import asyncio +import base64 +import datetime +import hashlib +import logging +import os + +import aiohttp +from livekit import rtc, api as lkapi +from livekit.agents import Agent, AgentSession +from livekit.plugins import openai as lk_openai, elevenlabs, silero + +logger = logging.getLogger("matrix-ai-voice") + +LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") +LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") +LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "") +LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "") +ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "") +DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier + +VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf. +Regeln: +- Halte Antworten KURZ - 1-3 Saetze maximal +- Sei direkt, keine Fuellwoerter +- Antworte immer auf Deutsch""" + +_vad = None +def _get_vad(): + global _vad + if _vad is None: + _vad = silero.VAD.load() + return _vad + +def _make_lk_identity(user_id, device_id): + return f"{user_id}:{device_id}" + +def _compute_lk_room_name(room_id): + raw = f"{room_id}|m.call#ROOM" + return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=") + +def _generate_lk_jwt(room_id, user_id, device_id): + identity = _make_lk_identity(user_id, device_id) + lk_room = _compute_lk_room_name(room_id) + token = ( + lkapi.AccessToken(LK_API_KEY, LK_API_SECRET) + .with_identity(identity) + .with_name(user_id) + .with_grants(lkapi.VideoGrants( + room_join=True, room=lk_room, + can_publish=True, can_subscribe=True)) + .with_ttl(datetime.timedelta(hours=24)) + ) + logger.info("JWT: identity=%s room=%s", identity, lk_room) + return token.to_jwt() + + +KDF_HKDF = 1 + + +def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions: + """Build HKDF E2EE options matching Element Call's key derivation.""" + key_opts = rtc.KeyProviderOptions( + shared_key=shared_key, + ratchet_window_size=0, + ratchet_salt=b"LKFrameEncryptionKey", + failure_tolerance=-1, + key_ring_size=16, + key_derivation_function=KDF_HKDF, + ) + return rtc.E2EEOptions(key_provider_options=key_opts) + + +class VoiceSession: + def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet"): + self.nio_client = nio_client + self.room_id = room_id + self.device_id = device_id + self.lk_url = lk_url + self.model = model + self.lk_room = None + self.session = None + self._task = None + self._http_session = None + self._e2ee_key: bytes | None = None + + def on_encryption_key(self, sender, device_id, key, index): + """Receive E2EE key from Element Call participant.""" + if key and not self._e2ee_key: + self._e2ee_key = key + logger.info("E2EE key received from %s:%s (index=%d, %d bytes)", + sender, device_id, index, len(key)) + + async def start(self): + self._task = asyncio.create_task(self._run()) + + async def stop(self): + for obj in [self.session, self.lk_room, self._http_session]: + if obj: + try: + if hasattr(obj, "aclose"): + await obj.aclose() + elif hasattr(obj, "disconnect"): + await obj.disconnect() + elif hasattr(obj, "close"): + await obj.close() + except Exception: + pass + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + async def _run(self): + try: + user_id = self.nio_client.user_id + jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id) + + # Wait up to 3s for E2EE encryption key from Element Call + for _ in range(30): + if self._e2ee_key: + break + await asyncio.sleep(0.1) + if not self._e2ee_key: + logger.warning("No E2EE key received after 3s, connecting without encryption") + + # Connect with E2EE if key available + e2ee_opts = None + if self._e2ee_key: + e2ee_opts = _build_e2ee_options(self._e2ee_key) + logger.info("E2EE enabled with HKDF (%d byte key)", len(self._e2ee_key)) + + room_opts = rtc.RoomOptions(e2ee=e2ee_opts) + self.lk_room = rtc.Room() + + @self.lk_room.on("participant_connected") + def on_p(p): + logger.info("Participant connected: %s", p.identity) + + @self.lk_room.on("track_published") + def on_tp(pub, p): + logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind) + + @self.lk_room.on("track_subscribed") + def on_ts(t, pub, p): + logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind) + + await self.lk_room.connect(self.lk_url, jwt, options=room_opts) + logger.info("Connected (E2EE=%s), remote=%d", + "HKDF" if self._e2ee_key else "off", + len(self.lk_room.remote_participants)) + + # Voice pipeline — German male voice (Daniel) + self._http_session = aiohttp.ClientSession() + voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID) + self.session = AgentSession( + stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session), + llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model), + tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2", + api_key=ELEVENLABS_KEY, http_session=self._http_session), + vad=_get_vad(), + ) + agent = Agent(instructions=VOICE_PROMPT) + await self.session.start(agent=agent, room=self.lk_room) + logger.info("Voice pipeline started (voice=%s)", voice_id) + + try: + await asyncio.wait_for( + self.session.generate_reply( + instructions="Begruesse den Nutzer kurz auf Deutsch."), + timeout=30.0) + logger.info("Greeting sent") + except asyncio.TimeoutError: + logger.error("Greeting timed out") + + while True: + await asyncio.sleep(1) + + except asyncio.CancelledError: + logger.info("Session cancelled for %s", self.room_id) + except Exception: + logger.exception("Session failed for %s", self.room_id)