Files
matrix-ai-agent/voice.py
Christian Gick c188a2daf6 test(voice): disable E2EE entirely — check if EC sends plaintext vs encrypted
If VAD triggers → EC audio reaches pipeline without decryption (plaintext or format issue).
If VAD silent → E2EE encryption on EC side but key/format mismatch on our side.
Note: bot greeting will be unencrypted so EC may not hear it.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-22 13:34:26 +02:00

427 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Voice session: LiveKit + STT/LLM/TTS pipeline.
E2EE via HKDF key derivation (Element Call compatible).
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
import asyncio
import base64
import datetime
import hashlib
import logging
import os
import zoneinfo
import aiohttp
from livekit import rtc, api as lkapi
from livekit.agents import Agent, AgentSession, room_io
from livekit.plugins import openai as lk_openai, elevenlabs, silero
logger = logging.getLogger("matrix-ai-voice")
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
DEFAULT_VOICE_ID = "JBFqnCBsd6RMkjVDRZzb" # George - warm, British male, multilingual
_VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
Aktuelle Zeit: {datetime}
STRIKTE Regeln:
- Antworte in der Sprache des Nutzers (Deutsch oder Englisch)
- Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze
- Sei direkt und praezise, keine Fuellwoerter
- Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie
- Beantworte nur was gefragt wird
- Wenn niemand etwas fragt, sage nur kurz Hallo"""
def _build_voice_prompt() -> str:
tz_name = os.environ.get("VOICE_TIMEZONE", "Europe/Berlin")
try:
tz = zoneinfo.ZoneInfo(tz_name)
except Exception:
tz = datetime.timezone.utc
now = datetime.datetime.now(tz)
return _VOICE_PROMPT_TEMPLATE.format(
datetime=now.strftime("%A, %d. %B %Y %H:%M %Z")
)
_vad = None
def _get_vad():
global _vad
if _vad is None:
_vad = silero.VAD.load()
return _vad
def _make_lk_identity(user_id, device_id):
return f"{user_id}:{device_id}"
def _compute_lk_room_name(room_id):
raw = f"{room_id}|m.call#ROOM"
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
def _generate_lk_jwt(room_id, user_id, device_id):
identity = _make_lk_identity(user_id, device_id)
lk_room = _compute_lk_room_name(room_id)
token = (
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
.with_identity(identity)
.with_name(user_id)
.with_grants(lkapi.VideoGrants(
room_join=True, room=lk_room,
can_publish=True, can_subscribe=True))
.with_ttl(datetime.timedelta(hours=24))
)
logger.info("JWT: identity=%s room=%s", identity, lk_room)
return token.to_jwt()
KDF_HKDF = 1 # Rust FFI applies HKDF internally (proto enum value 1)
# NOTE: value 0 in the proto is PBKDF2, NOT raw/none mode — see e2ee_patch.py
def _hkdf_derive(ikm: bytes) -> bytes:
"""Pre-derive AES key via HKDF-SHA256 matching livekit-client-sdk-js deriveEncryptionKey().
JS params: hash=SHA-256, salt=encode("LKFrameEncryptionKey"), info=ArrayBuffer(128), length=128bit
We set this pre-derived key via set_shared_key() which bypasses Rust FFI KDF entirely.
"""
import hmac
salt = b"LKFrameEncryptionKey"
info = b"\x00" * 128
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
t1 = hmac.new(prk, info + b"\x01", hashlib.sha256).digest()
return t1[:16]
def _build_e2ee_options() -> rtc.E2EEOptions:
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
Pass raw base keys from Matrix key exchange events directly to set_key().
The Rust FFI derives the AES frame key via HKDF(base_key, ratchetSalt, ...) internally.
Element Call uses: ratchetWindowSize=10, keyringSize=256, ratchetSalt="LKFrameEncryptionKey"
NOTE: proto value 0 = PBKDF2 (not raw/none) — must use KDF_HKDF=1.
"""
key_opts = rtc.KeyProviderOptions(
shared_key=b"", # empty = per-participant mode
ratchet_window_size=10,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=10,
key_ring_size=256,
key_derivation_function=KDF_HKDF, # Rust FFI applies HKDF; we pass raw base keys
)
return rtc.E2EEOptions(key_provider_options=key_opts)
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
publish_key_cb=None, bot_key: bytes | None = None):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
self.lk_url = lk_url
self.model = model
self.lk_room = None
self.session = None
self._task = None
self._http_session = None
self._stt_session = None
self._caller_key: bytes | None = None
self._caller_identity: str | None = None
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
self._bot_key: bytes = bot_key or os.urandom(16)
self._publish_key_cb = publish_key_cb
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant."""
if not key:
return
if not self._caller_key:
self._caller_key = key
self._caller_identity = f"{sender}:{device_id}"
self._caller_all_keys[index] = key
logger.info("E2EE key received from %s:%s (index=%d, %d bytes)",
sender, device_id, index, len(key))
# Live-update shared_key with pre-derived AES key on rotation.
# set_shared_key() bypasses Rust FFI KDF — we pre-derive with Python HKDF.
# Per-participant set_key() is NOT used for caller (Rust HKDF may differ from JS).
if self.lk_room and hasattr(self.lk_room, 'e2ee_manager'):
try:
kp = self.lk_room.e2ee_manager.key_provider
derived = _hkdf_derive(key)
kp.set_shared_key(derived, index)
logger.info("Live-updated shared_key (pre-derived)[%d] for caller (%d bytes)",
index, len(derived))
except Exception as e:
logger.warning("Failed to live-update caller shared_key: %s", e)
async def _fetch_encryption_key_http(self) -> bytes | None:
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
Element Call distributes encryption keys as timeline events, not state.
"""
import httpx
homeserver = str(self.nio_client.homeserver)
token = self.nio_client.access_token
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
try:
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
url,
headers={"Authorization": f"Bearer {token}"},
params={"dir": "b", "limit": "50"},
)
resp.raise_for_status()
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
sender = evt.get("sender", "")
if sender == user_id:
continue # skip our own key
content = evt.get("content", {})
device = content.get("device_id", "")
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
sender, device)
all_keys = {}
import base64 as b64
for k in content.get("keys", []):
key_b64 = k.get("key", "")
key_index = k.get("index", 0)
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key_bytes = b64.urlsafe_b64decode(key_b64)
all_keys[key_index] = key_bytes
if all_keys:
if device:
self._caller_identity = f"{sender}:{device}"
self._caller_all_keys.update(all_keys)
max_idx = max(all_keys.keys())
logger.info("Loaded caller keys at indices %s (using %d)",
sorted(all_keys.keys()), max_idx)
return all_keys[max_idx]
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
logger.warning("HTTP encryption key fetch failed: %s", e)
return None
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
for obj in [self.session, self.lk_room, self._http_session, self._stt_session]:
if obj:
try:
if hasattr(obj, "aclose"):
await obj.aclose()
elif hasattr(obj, "disconnect"):
await obj.disconnect()
elif hasattr(obj, "close"):
await obj.close()
except Exception:
pass
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run(self):
try:
user_id = self.nio_client.user_id
bot_identity = _make_lk_identity(user_id, self.device_id)
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Publish bot's own key immediately so Element Call can decrypt us
if self._publish_key_cb:
self._publish_key_cb(self._bot_key)
logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key))
# Check timeline for caller's encryption key
caller_key = await self._fetch_encryption_key_http()
if caller_key:
self._caller_key = caller_key
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
if not self._caller_key:
# Wait up to 15s for key via sync handler
logger.info("No key in timeline yet, waiting for sync...")
for _ in range(150):
if self._caller_key:
break
await asyncio.sleep(0.1)
# DIAGNOSTIC: disable E2EE entirely to check if EC sends encrypted or plaintext.
# If VAD triggers → EC sends plaintext (E2EE disabled on EC side).
# If VAD silent → EC sends encrypted (frame format or key issue).
e2ee_opts = None # TODO: re-enable after diagnosis
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
self.lk_room = rtc.Room()
@self.lk_room.on("participant_connected")
def on_p(p):
logger.info("Participant connected: %s", p.identity)
# Apply any already-received caller keys to the new participant's LK identity.
# This handles the case where key arrives before the participant joins LiveKit.
if self._caller_all_keys:
try:
kp_local = self.lk_room.e2ee_manager.key_provider
# Use shared_key with pre-derived AES key (not set_key which applies Rust HKDF)
for idx, base_k in sorted(self._caller_all_keys.items()):
kp_local.set_shared_key(_hkdf_derive(base_k), idx)
logger.info("on_p: set shared_key (pre-derived) for %d caller key(s)",
len(self._caller_all_keys))
except Exception as exc:
logger.warning("on_p: failed to set caller shared_key: %s", exc)
@self.lk_room.on("track_published")
def on_tp(pub, p):
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_subscribed")
def on_ts(t, pub, p):
logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind)
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
@self.lk_room.on("e2ee_state_changed")
def on_e2ee_state(participant, state):
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
p_id = participant.identity if participant else "local"
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=HKDF), remote=%d",
len(self.lk_room.remote_participants))
# *** FIX: Set keys immediately after connect — BEFORE the rotation wait.
# The caller's track is subscribed during the wait; if no key is set when
# the frame cryptor is first created it enters DEC_FAILED and drops all frames
# even after the key is set later.
kp = self.lk_room.e2ee_manager.key_provider
kp.set_key(bot_identity, self._bot_key, 0)
logger.info("Set bot raw key for %s (%d bytes)", bot_identity, len(self._bot_key))
# Caller keys: use set_shared_key with pre-derived AES (bypasses Rust HKDF).
# Per-participant set_key is NOT called for caller — Rust HKDF may not match EC's JS HKDF.
if self._caller_all_keys:
for idx, base_k in sorted(self._caller_all_keys.items()):
kp.set_shared_key(_hkdf_derive(base_k), idx)
logger.info("Early-set shared_key (pre-derived) for caller indices %s",
list(self._caller_all_keys.keys()))
elif self._caller_key:
kp.set_shared_key(_hkdf_derive(self._caller_key), 0)
logger.info("Early-set shared_key (pre-derived) caller key[0] (%d bytes)", 16)
# Element Call rotates its encryption key when bot joins the LiveKit room.
# EC sends the new key via Matrix (Megolm-encrypted); nio sync will decrypt it
# and call on_encryption_key(), which updates self._caller_all_keys.
# NOTE: HTTP fetch is useless here — keys are Matrix-E2EE encrypted (m.room.encrypted).
pre_max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
logger.info("Waiting for EC key rotation via nio sync (current max_idx=%d)...", pre_max_idx)
for _attempt in range(4): # up to 2s (4 × 0.5s)
await asyncio.sleep(0.5)
new_max = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
if new_max > pre_max_idx:
self._caller_key = self._caller_all_keys[new_max]
logger.info("Key rotated: index %d%d (%d bytes)",
pre_max_idx, new_max, len(self._caller_key))
break
if _attempt % 2 == 1: # log every 1s
logger.info("Key rotation wait %0.1fs: max_idx still %d", (_attempt + 1) * 0.5, new_max)
else:
logger.warning("No key rotation after 2s — using pre-join key[%d]", pre_max_idx)
# Find the remote participant, wait up to 10s if not yet connected
remote_identity = None
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if not remote_identity:
logger.info("No remote participant yet, waiting...")
for _ in range(100):
await asyncio.sleep(0.1)
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if remote_identity:
break
# Set shared_key with pre-derived AES key for caller decryption.
# NOT using set_key() for caller — Rust HKDF may produce different result than EC's JS HKDF.
# set_shared_key() stores key raw (no KDF applied) — we pre-derive in Python.
if self._caller_all_keys:
try:
for idx, base_k in sorted(self._caller_all_keys.items()):
derived = _hkdf_derive(base_k)
kp.set_shared_key(derived, idx)
logger.info("Set shared_key (pre-derived)[%d] (%d bytes)", idx, len(derived))
except Exception as e:
logger.warning("Failed to set caller shared_key: %s", e)
elif not self._caller_all_keys:
logger.warning("No caller E2EE keys — incoming audio will be silence")
if remote_identity:
logger.info("Linking to remote participant: %s", remote_identity)
# Voice pipeline — George (British male, multilingual DE/EN)
self._http_session = aiohttp.ClientSession()
self._stt_session = aiohttp.ClientSession() # separate session avoids WS/HTTP conflicts
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
self.session = AgentSession(
stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._stt_session),
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2",
api_key=ELEVENLABS_KEY, http_session=self._http_session),
vad=_get_vad(),
)
# Debug: log speech pipeline events to pinpoint where audio is lost
@self.session.on("user_started_speaking")
def _on_speaking_start():
logger.info("VAD: user_started_speaking")
@self.session.on("user_stopped_speaking")
def _on_speaking_stop():
logger.info("VAD: user_stopped_speaking")
@self.session.on("user_speech_committed")
def _on_user_speech(msg):
logger.info("USER_SPEECH: %s", msg.text_content)
@self.session.on("agent_speech_committed")
def _on_agent_speech(msg):
logger.info("AGENT_SPEECH: %s", msg.text_content)
agent = Agent(instructions=_build_voice_prompt())
io_opts = room_io.RoomOptions(
participant_identity=remote_identity,
close_on_disconnect=False,
) if remote_identity else room_io.RoomOptions(close_on_disconnect=False)
await self.session.start(
agent=agent,
room=self.lk_room,
room_options=io_opts,
)
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Sage nur: Hallo, wie kann ich helfen?"),
timeout=30.0)
logger.info("Greeting sent")
except asyncio.TimeoutError:
logger.error("Greeting timed out")
while True:
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id)
except Exception:
logger.exception("Session failed for %s", self.room_id)