Files
matrix-ai-agent/voice.py
Christian Gick c379064f80 fix(voice): set caller key in on_track_subscribed — frame cryptor must exist for HKDF to apply
Root cause: C++ set_key() only applies HKDF when impl_->GetKey(pid) returns a valid
handler, which requires the frame cryptor for that participant to be initialized.
Frame cryptors are created at track subscription time, not at connect time.

Calling set_key(caller_identity, key) immediately after connect() skips HKDF
derivation (impl_->GetKey returns null) → raw key stored → DEC_FAILED.

Fix: move caller key setting to on_track_subscribed where frame cryptor definitely exists.
Also update on_encryption_key to use set_key() for key rotation updates.
2026-02-22 14:05:54 +02:00

439 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Voice session: LiveKit + STT/LLM/TTS pipeline.
E2EE via HKDF key derivation (Element Call compatible).
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
import asyncio
import base64
import datetime
import hashlib
import logging
import os
import zoneinfo
import aiohttp
from livekit import rtc, api as lkapi
from livekit.agents import Agent, AgentSession, room_io
from livekit.plugins import openai as lk_openai, elevenlabs, silero
logger = logging.getLogger("matrix-ai-voice")
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
DEFAULT_VOICE_ID = "JBFqnCBsd6RMkjVDRZzb" # George - warm, British male, multilingual
_VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
Aktuelle Zeit: {datetime}
STRIKTE Regeln:
- Antworte in der Sprache des Nutzers (Deutsch oder Englisch)
- Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze
- Sei direkt und praezise, keine Fuellwoerter
- Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie
- Beantworte nur was gefragt wird
- Wenn niemand etwas fragt, sage nur kurz Hallo"""
def _build_voice_prompt() -> str:
tz_name = os.environ.get("VOICE_TIMEZONE", "Europe/Berlin")
try:
tz = zoneinfo.ZoneInfo(tz_name)
except Exception:
tz = datetime.timezone.utc
now = datetime.datetime.now(tz)
return _VOICE_PROMPT_TEMPLATE.format(
datetime=now.strftime("%A, %d. %B %Y %H:%M %Z")
)
_vad = None
def _get_vad():
global _vad
if _vad is None:
_vad = silero.VAD.load()
return _vad
def _make_lk_identity(user_id, device_id):
return f"{user_id}:{device_id}"
def _compute_lk_room_name(room_id):
raw = f"{room_id}|m.call#ROOM"
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
def _generate_lk_jwt(room_id, user_id, device_id):
identity = _make_lk_identity(user_id, device_id)
lk_room = _compute_lk_room_name(room_id)
token = (
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
.with_identity(identity)
.with_name(user_id)
.with_grants(lkapi.VideoGrants(
room_join=True, room=lk_room,
can_publish=True, can_subscribe=True))
.with_ttl(datetime.timedelta(hours=24))
)
logger.info("JWT: identity=%s room=%s", identity, lk_room)
return token.to_jwt()
KDF_HKDF = 1 # Rust FFI applies HKDF internally (proto enum value 1)
# NOTE: value 0 in the proto is PBKDF2, NOT raw/none mode — see e2ee_patch.py
def _hkdf_derive(ikm: bytes) -> bytes:
"""Pre-derive AES key via HKDF-SHA256 matching livekit-client-sdk-js deriveEncryptionKey().
JS params: hash=SHA-256, salt=encode("LKFrameEncryptionKey"), info=ArrayBuffer(128), length=128bit
We set this pre-derived key via set_shared_key() which bypasses Rust FFI KDF entirely.
"""
import hmac
salt = b"LKFrameEncryptionKey"
info = b"\x00" * 128
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
t1 = hmac.new(prk, info + b"\x01", hashlib.sha256).digest()
return t1[:16]
def _build_e2ee_options() -> rtc.E2EEOptions:
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
Pass raw base keys from Matrix key exchange events directly to set_key().
The Rust FFI derives the AES frame key via HKDF(base_key, ratchetSalt, ...) internally.
Element Call uses: ratchetWindowSize=10, keyringSize=256, ratchetSalt="LKFrameEncryptionKey"
NOTE: proto value 0 = PBKDF2 (not raw/none) — must use KDF_HKDF=1.
"""
key_opts = rtc.KeyProviderOptions(
shared_key=b"", # empty = per-participant mode
ratchet_window_size=10,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=10,
key_ring_size=256,
key_derivation_function=KDF_HKDF, # Rust FFI applies HKDF; we pass raw base keys
)
return rtc.E2EEOptions(key_provider_options=key_opts)
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
publish_key_cb=None, bot_key: bytes | None = None):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
self.lk_url = lk_url
self.model = model
self.lk_room = None
self.session = None
self._task = None
self._http_session = None
self._stt_session = None
self._caller_key: bytes | None = None
self._caller_identity: str | None = None
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
self._bot_key: bytes = bot_key or os.urandom(16)
self._publish_key_cb = publish_key_cb
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant."""
if not key:
return
if not self._caller_key:
self._caller_key = key
self._caller_identity = f"{sender}:{device_id}"
self._caller_all_keys[index] = key
logger.info("E2EE key received from %s:%s (index=%d, %d bytes)",
sender, device_id, index, len(key))
# Live-update key on rotation — use set_key() which applies HKDF via Rust FFI.
# At this point the track is usually already subscribed so frame cryptor exists.
if self.lk_room and self._caller_identity:
caller_lk_id = self._caller_identity # e.g. "@user:server.eu:DEVICEID"
for p in self.lk_room.remote_participants.values():
if p.identity == caller_lk_id:
try:
kp = self.lk_room.e2ee_manager.key_provider
ok = kp.set_key(p.identity, index, key)
logger.info("Live-updated set_key[%d] for %s (ok=%s, %d bytes)",
index, p.identity, ok, len(key))
except Exception as e:
logger.warning("Failed to live-update caller key: %s", e)
break
async def _fetch_encryption_key_http(self) -> bytes | None:
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
Element Call distributes encryption keys as timeline events, not state.
"""
import httpx
homeserver = str(self.nio_client.homeserver)
token = self.nio_client.access_token
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
try:
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
url,
headers={"Authorization": f"Bearer {token}"},
params={"dir": "b", "limit": "50"},
)
resp.raise_for_status()
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
sender = evt.get("sender", "")
if sender == user_id:
continue # skip our own key
content = evt.get("content", {})
device = content.get("device_id", "")
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
sender, device)
all_keys = {}
import base64 as b64
for k in content.get("keys", []):
key_b64 = k.get("key", "")
key_index = k.get("index", 0)
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key_bytes = b64.urlsafe_b64decode(key_b64)
all_keys[key_index] = key_bytes
if all_keys:
if device:
self._caller_identity = f"{sender}:{device}"
self._caller_all_keys.update(all_keys)
max_idx = max(all_keys.keys())
logger.info("Loaded caller keys at indices %s (using %d)",
sorted(all_keys.keys()), max_idx)
return all_keys[max_idx]
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
logger.warning("HTTP encryption key fetch failed: %s", e)
return None
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
for obj in [self.session, self.lk_room, self._http_session, self._stt_session]:
if obj:
try:
if hasattr(obj, "aclose"):
await obj.aclose()
elif hasattr(obj, "disconnect"):
await obj.disconnect()
elif hasattr(obj, "close"):
await obj.close()
except Exception:
pass
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run(self):
try:
user_id = self.nio_client.user_id
bot_identity = _make_lk_identity(user_id, self.device_id)
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Publish bot's own key immediately so Element Call can decrypt us
if self._publish_key_cb:
self._publish_key_cb(self._bot_key)
logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key))
# Check timeline for caller's encryption key
caller_key = await self._fetch_encryption_key_http()
if caller_key:
self._caller_key = caller_key
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
if not self._caller_key:
# Wait up to 15s for key via sync handler
logger.info("No key in timeline yet, waiting for sync...")
for _ in range(150):
if self._caller_key:
break
await asyncio.sleep(0.1)
# E2EE: re-enabled after diagnostic confirmed EC encrypts audio.
# Root cause found: set_key() only applies HKDF if the frame cryptor for that
# participant already exists. Must call set_key() in on_track_subscribed, not at connect time.
key_opts = rtc.KeyProviderOptions(
shared_key=b"", # per-participant mode
ratchet_window_size=16,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=-1,
key_ring_size=16,
key_derivation_function=KDF_HKDF, # Rust applies HKDF matching EC JS SDK
)
e2ee_opts = rtc.E2EEOptions(
encryption_type=rtc.EncryptionType.GCM,
key_provider_options=key_opts,
)
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
self.lk_room = rtc.Room()
@self.lk_room.on("participant_connected")
def on_p(p):
logger.info("Participant connected: %s", p.identity)
# Note: do NOT set keys here — frame cryptor not initialized yet at participant_connected.
# Keys are set in on_track_subscribed where the frame cryptor definitely exists.
@self.lk_room.on("track_published")
def on_tp(pub, p):
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_subscribed")
def on_ts(t, pub, p):
logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind)
# *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor
# for this participant already exists. The frame cryptor is created at track
# subscription time. Calling set_key() BEFORE track subscription (at connect)
# skips HKDF derivation → raw key stored → DEC_FAILED.
# Solution: set caller key HERE, after frame cryptor is initialized.
if int(t.kind) == 1 and e2ee_opts is not None: # audio track only
try:
kp_local = self.lk_room.e2ee_manager.key_provider
if self._caller_all_keys:
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_local.set_key(p.identity, idx, base_k)
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes)",
idx, p.identity, ok, len(base_k))
else:
logger.warning("on_ts: no caller keys available yet — will set on key receipt")
except Exception as exc:
logger.warning("on_ts: set_key failed: %s", exc)
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
@self.lk_room.on("e2ee_state_changed")
def on_e2ee_state(participant, state):
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
p_id = participant.identity if participant else "local"
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=HKDF), remote=%d",
len(self.lk_room.remote_participants))
# Set bot's own key immediately after connect — local frame cryptor exists at connect time.
# CALLER keys are set in on_track_subscribed (NOT here) because the caller's frame cryptor
# is only created when their track arrives. Calling set_key() before that skips HKDF.
kp = self.lk_room.e2ee_manager.key_provider
ok = kp.set_key(bot_identity, 0, self._bot_key)
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
# Element Call rotates its encryption key when bot joins the LiveKit room.
# EC sends the new key via Matrix (Megolm-encrypted); nio sync will decrypt it
# and call on_encryption_key(), which updates self._caller_all_keys.
# NOTE: HTTP fetch is useless here — keys are Matrix-E2EE encrypted (m.room.encrypted).
pre_max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
logger.info("Waiting for EC key rotation via nio sync (current max_idx=%d)...", pre_max_idx)
for _attempt in range(4): # up to 2s (4 × 0.5s)
await asyncio.sleep(0.5)
new_max = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
if new_max > pre_max_idx:
self._caller_key = self._caller_all_keys[new_max]
logger.info("Key rotated: index %d%d (%d bytes)",
pre_max_idx, new_max, len(self._caller_key))
break
if _attempt % 2 == 1: # log every 1s
logger.info("Key rotation wait %0.1fs: max_idx still %d", (_attempt + 1) * 0.5, new_max)
else:
logger.warning("No key rotation after 2s — using pre-join key[%d]", pre_max_idx)
# Find the remote participant, wait up to 10s if not yet connected
remote_identity = None
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if not remote_identity:
logger.info("No remote participant yet, waiting...")
for _ in range(100):
await asyncio.sleep(0.1)
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if remote_identity:
break
# Set shared_key with pre-derived AES key for caller decryption.
# NOT using set_key() for caller — Rust HKDF may produce different result than EC's JS HKDF.
# set_shared_key() stores key raw (no KDF applied) — we pre-derive in Python.
# After key rotation wait: if track already subscribed, set rotated key.
# (Usually on_track_subscribed handles this, but if track arrived before rotation,
# the rotated key needs to be set here for the already-subscribed participant.)
if self._caller_all_keys and remote_identity:
try:
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp.set_key(remote_identity, idx, base_k)
logger.info("Post-rotation set_key[%d] for %s (ok=%s)",
idx, remote_identity, ok)
except Exception as e:
logger.warning("Post-rotation set_key failed: %s", e)
elif not self._caller_all_keys:
logger.warning("No caller E2EE keys received — incoming audio will be silence")
if remote_identity:
logger.info("Linking to remote participant: %s", remote_identity)
# Voice pipeline — George (British male, multilingual DE/EN)
self._http_session = aiohttp.ClientSession()
self._stt_session = aiohttp.ClientSession() # separate session avoids WS/HTTP conflicts
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
self.session = AgentSession(
stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._stt_session),
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2",
api_key=ELEVENLABS_KEY, http_session=self._http_session),
vad=_get_vad(),
)
# Debug: log speech pipeline events to pinpoint where audio is lost
@self.session.on("user_started_speaking")
def _on_speaking_start():
logger.info("VAD: user_started_speaking")
@self.session.on("user_stopped_speaking")
def _on_speaking_stop():
logger.info("VAD: user_stopped_speaking")
@self.session.on("user_speech_committed")
def _on_user_speech(msg):
logger.info("USER_SPEECH: %s", msg.text_content)
@self.session.on("agent_speech_committed")
def _on_agent_speech(msg):
logger.info("AGENT_SPEECH: %s", msg.text_content)
agent = Agent(instructions=_build_voice_prompt())
io_opts = room_io.RoomOptions(
participant_identity=remote_identity,
close_on_disconnect=False,
) if remote_identity else room_io.RoomOptions(close_on_disconnect=False)
await self.session.start(
agent=agent,
room=self.lk_room,
room_options=io_opts,
)
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Sage nur: Hallo, wie kann ich helfen?"),
timeout=30.0)
logger.info("Greeting sent")
except asyncio.TimeoutError:
logger.error("Greeting timed out")
while True:
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id)
except Exception:
logger.exception("Session failed for %s", self.room_id)