Files
matrix-ai-agent/voice.py
Christian Gick e5e8b56482 fix(e2ee): Add E2EE HKDF to voice.py, bot uses patched Dockerfile
voice.py runs in bot container, not agent container.
- Wait 3s for encryption key before connecting
- Build E2EE options with HKDF when key received
- Bot container now uses patched Dockerfile (needs FFI)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 17:13:53 +02:00

188 lines
6.8 KiB
Python

"""Voice session: LiveKit + STT/LLM/TTS pipeline.
E2EE via HKDF key derivation (Element Call compatible).
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
import asyncio
import base64
import datetime
import hashlib
import logging
import os
import aiohttp
from livekit import rtc, api as lkapi
from livekit.agents import Agent, AgentSession
from livekit.plugins import openai as lk_openai, elevenlabs, silero
logger = logging.getLogger("matrix-ai-voice")
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier
VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
Regeln:
- Halte Antworten KURZ - 1-3 Saetze maximal
- Sei direkt, keine Fuellwoerter
- Antworte immer auf Deutsch"""
_vad = None
def _get_vad():
global _vad
if _vad is None:
_vad = silero.VAD.load()
return _vad
def _make_lk_identity(user_id, device_id):
return f"{user_id}:{device_id}"
def _compute_lk_room_name(room_id):
raw = f"{room_id}|m.call#ROOM"
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
def _generate_lk_jwt(room_id, user_id, device_id):
identity = _make_lk_identity(user_id, device_id)
lk_room = _compute_lk_room_name(room_id)
token = (
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
.with_identity(identity)
.with_name(user_id)
.with_grants(lkapi.VideoGrants(
room_join=True, room=lk_room,
can_publish=True, can_subscribe=True))
.with_ttl(datetime.timedelta(hours=24))
)
logger.info("JWT: identity=%s room=%s", identity, lk_room)
return token.to_jwt()
KDF_HKDF = 1
def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions:
"""Build HKDF E2EE options matching Element Call's key derivation."""
key_opts = rtc.KeyProviderOptions(
shared_key=shared_key,
ratchet_window_size=0,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=-1,
key_ring_size=16,
key_derivation_function=KDF_HKDF,
)
return rtc.E2EEOptions(key_provider_options=key_opts)
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet"):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
self.lk_url = lk_url
self.model = model
self.lk_room = None
self.session = None
self._task = None
self._http_session = None
self._e2ee_key: bytes | None = None
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant."""
if key and not self._e2ee_key:
self._e2ee_key = key
logger.info("E2EE key received from %s:%s (index=%d, %d bytes)",
sender, device_id, index, len(key))
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
for obj in [self.session, self.lk_room, self._http_session]:
if obj:
try:
if hasattr(obj, "aclose"):
await obj.aclose()
elif hasattr(obj, "disconnect"):
await obj.disconnect()
elif hasattr(obj, "close"):
await obj.close()
except Exception:
pass
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run(self):
try:
user_id = self.nio_client.user_id
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Wait up to 3s for E2EE encryption key from Element Call
for _ in range(30):
if self._e2ee_key:
break
await asyncio.sleep(0.1)
if not self._e2ee_key:
logger.warning("No E2EE key received after 3s, connecting without encryption")
# Connect with E2EE if key available
e2ee_opts = None
if self._e2ee_key:
e2ee_opts = _build_e2ee_options(self._e2ee_key)
logger.info("E2EE enabled with HKDF (%d byte key)", len(self._e2ee_key))
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
self.lk_room = rtc.Room()
@self.lk_room.on("participant_connected")
def on_p(p):
logger.info("Participant connected: %s", p.identity)
@self.lk_room.on("track_published")
def on_tp(pub, p):
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_subscribed")
def on_ts(t, pub, p):
logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind)
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=%s), remote=%d",
"HKDF" if self._e2ee_key else "off",
len(self.lk_room.remote_participants))
# Voice pipeline — German male voice (Daniel)
self._http_session = aiohttp.ClientSession()
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
self.session = AgentSession(
stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session),
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2",
api_key=ELEVENLABS_KEY, http_session=self._http_session),
vad=_get_vad(),
)
agent = Agent(instructions=VOICE_PROMPT)
await self.session.start(agent=agent, room=self.lk_room)
logger.info("Voice pipeline started (voice=%s)", voice_id)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Begruesse den Nutzer kurz auf Deutsch."),
timeout=30.0)
logger.info("Greeting sent")
except asyncio.TimeoutError:
logger.error("Greeting timed out")
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id)
except Exception:
logger.exception("Session failed for %s", self.room_id)