Files
matrix-ai-agent/voice.py
Christian Gick 2fa13c4958 fix: Use caller key as shared_key at connect time for immediate decryption
Per-participant set_key alone with empty shared_key caused silent incoming audio.
Now connects with caller key as shared_key, then overlays per-participant keys.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 20:35:55 +02:00

338 lines
14 KiB
Python

"""Voice session: LiveKit + STT/LLM/TTS pipeline.
E2EE via HKDF key derivation (Element Call compatible).
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
import asyncio
import base64
import datetime
import hashlib
import logging
import os
import aiohttp
from livekit import rtc, api as lkapi
from livekit.agents import Agent, AgentSession, room_io
from livekit.plugins import openai as lk_openai, elevenlabs, silero
logger = logging.getLogger("matrix-ai-voice")
# Enable debug logging for agents pipeline to diagnose audio issues
logging.getLogger("livekit.agents").setLevel(logging.DEBUG)
logging.getLogger("livekit.plugins").setLevel(logging.DEBUG)
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
DEFAULT_VOICE_ID = "JBFqnCBsd6RMkjVDRZzb" # George - warm, captivating, British male
VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent von Agiliton in einem Matrix-Anruf.
Du heisst "Agiliton Assistant". Du basierst auf dem Modell {model}.
Wenn jemand fragt welches Modell du bist, sei transparent und sage es.
Aktuelle Zeit: {datetime}
STRIKTE Regeln:
- Antworte in der Sprache in der der Nutzer spricht
- Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze
- Sei direkt und praezise, keine Fuellwoerter
- Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie
- Beantworte nur was gefragt wird
- Wenn niemand etwas fragt, sage nur kurz Hallo"""
_vad = None
def _get_vad():
global _vad
if _vad is None:
_vad = silero.VAD.load()
return _vad
def _make_lk_identity(user_id, device_id):
return f"{user_id}:{device_id}"
def _compute_lk_room_name(room_id):
raw = f"{room_id}|m.call#ROOM"
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
def _generate_lk_jwt(room_id, user_id, device_id):
identity = _make_lk_identity(user_id, device_id)
lk_room = _compute_lk_room_name(room_id)
token = (
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
.with_identity(identity)
.with_name(user_id)
.with_grants(lkapi.VideoGrants(
room_join=True, room=lk_room,
can_publish=True, can_subscribe=True))
.with_ttl(datetime.timedelta(hours=24))
)
logger.info("JWT: identity=%s room=%s", identity, lk_room)
return token.to_jwt()
KDF_HKDF = 1
def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions:
"""Build HKDF E2EE options matching Element Call's key derivation.
Element Call uses: ratchetWindowSize=10, keyringSize=256, salt="LKFrameEncryptionKey"
"""
key_opts = rtc.KeyProviderOptions(
shared_key=shared_key,
ratchet_window_size=10,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=10,
key_ring_size=256,
key_derivation_function=KDF_HKDF,
)
return rtc.E2EEOptions(key_provider_options=key_opts)
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
publish_key_cb=None, bot_key: bytes | None = None):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
self.lk_url = lk_url
self.model = model
self.lk_room = None
self.session = None
self._task = None
self._http_session = None
self._caller_key: bytes | None = None
self._caller_identity: str | None = None # "sender:device_id" format
self._bot_key: bytes = bot_key or os.urandom(16)
self._publish_key_cb = publish_key_cb
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant.
If the room is already connected, immediately set the key on the
key provider so we can decrypt the caller's audio.
"""
if not key:
return
identity = _make_lk_identity(sender, device_id)
self._caller_key = key
self._caller_identity = identity
logger.info("E2EE key received from %s:%s (identity=%s, index=%d, %d bytes)",
sender, device_id, identity, index, len(key))
# If already connected, set key on the key provider immediately
if self.lk_room:
try:
kp = self.lk_room.e2ee_manager.key_provider
kp.set_key(identity, key, key_index=index)
logger.info("Live-updated caller E2EE key for %s", identity)
except Exception:
logger.warning("Could not live-update caller E2EE key", exc_info=True)
async def _fetch_encryption_key_http(self) -> bytes | None:
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
Element Call distributes encryption keys as timeline events, not state.
Also sets self._caller_identity from the event sender + device_id.
"""
import httpx
homeserver = str(self.nio_client.homeserver)
token = self.nio_client.access_token
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
try:
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
url,
headers={"Authorization": f"Bearer {token}"},
params={"dir": "b", "limit": "50"},
)
resp.raise_for_status()
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
sender = evt.get("sender", "")
if sender == user_id:
continue # skip our own key
content = evt.get("content", {})
caller_device = content.get("device_id", "")
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
sender, caller_device)
if caller_device:
self._caller_identity = _make_lk_identity(sender, caller_device)
for k in content.get("keys", []):
key_b64 = k.get("key", "")
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
return base64.urlsafe_b64decode(key_b64)
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
logger.warning("HTTP encryption key fetch failed: %s", e)
return None
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
for obj in [self.session, self.lk_room, self._http_session]:
if obj:
try:
if hasattr(obj, "aclose"):
await obj.aclose()
elif hasattr(obj, "disconnect"):
await obj.disconnect()
elif hasattr(obj, "close"):
await obj.close()
except Exception:
pass
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run(self):
try:
user_id = self.nio_client.user_id
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Check timeline for caller's encryption key
caller_key = await self._fetch_encryption_key_http()
if caller_key:
self._caller_key = caller_key
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
if not self._caller_key:
# Wait up to 15s for key via sync handler (bot.py forwards
# encryption_keys timeline events to on_encryption_key)
logger.info("No key in timeline yet, waiting for sync...")
for _ in range(150):
if self._caller_key:
break
await asyncio.sleep(0.1)
# Publish bot's own key so caller can decrypt our audio
if self._publish_key_cb:
self._publish_key_cb(self._bot_key)
# Connect with caller's key as shared_key for immediate decryption,
# then set per-participant keys after connect for proper separation
connect_key = self._caller_key or self._bot_key
e2ee_opts = _build_e2ee_options(connect_key)
logger.info("E2EE connect key: %d bytes (from %s)",
len(connect_key), "caller" if self._caller_key else "bot")
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
self.lk_room = rtc.Room()
@self.lk_room.on("participant_connected")
def on_p(p):
logger.info("Participant connected: %s", p.identity)
@self.lk_room.on("track_published")
def on_tp(pub, p):
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_subscribed")
def on_ts(t, pub, p):
logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind)
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=shared+per-participant), remote=%d",
len(self.lk_room.remote_participants))
# Set per-participant E2EE keys after connect
bot_identity = _make_lk_identity(user_id, self.device_id)
try:
kp = self.lk_room.e2ee_manager.key_provider
# Set bot's own key (encrypts outgoing audio)
kp.set_key(bot_identity, self._bot_key, key_index=0)
logger.info("Set bot E2EE key for identity=%s (%d bytes)",
bot_identity, len(self._bot_key))
# Set caller's key (decrypts incoming audio)
if self._caller_key and self._caller_identity:
kp.set_key(self._caller_identity, self._caller_key, key_index=0)
logger.info("Set caller E2EE key for identity=%s (%d bytes)",
self._caller_identity, len(self._caller_key))
elif self._caller_key:
for p in self.lk_room.remote_participants.values():
kp.set_key(p.identity, self._caller_key, key_index=0)
logger.info("Set caller E2EE key for identity=%s (%d bytes)",
p.identity, len(self._caller_key))
break
except Exception:
logger.warning("Per-participant key setup failed, shared key used as fallback",
exc_info=True)
# Find the remote participant, wait up to 10s if not yet connected
remote_identity = None
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if not remote_identity:
logger.info("No remote participant yet, waiting...")
for _ in range(100):
await asyncio.sleep(0.1)
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if remote_identity:
break
if remote_identity:
logger.info("Linking to remote participant: %s", remote_identity)
# Voice pipeline — German male voice (Daniel)
self._http_session = aiohttp.ClientSession()
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
self.session = AgentSession(
stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session),
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2",
api_key=ELEVENLABS_KEY, http_session=self._http_session),
vad=_get_vad(),
)
# Debug: log speech events
@self.session.on("user_speech_committed")
def _on_user_speech(msg):
logger.info("USER_SPEECH: %s", msg.text_content)
@self.session.on("agent_speech_committed")
def _on_agent_speech(msg):
logger.info("AGENT_SPEECH: %s", msg.text_content)
now = datetime.datetime.now(datetime.timezone.utc).strftime("%A, %B %d, %Y %H:%M UTC")
prompt = VOICE_PROMPT_TEMPLATE.format(model=self.model, datetime=now)
agent = Agent(instructions=prompt)
io_opts = room_io.RoomOptions(
participant_identity=remote_identity,
) if remote_identity else room_io.RoomOptions()
await self.session.start(
agent=agent,
room=self.lk_room,
room_options=io_opts,
)
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Sage nur: Hallo, wie kann ich helfen?"),
timeout=30.0)
logger.info("Greeting sent")
except asyncio.TimeoutError:
logger.error("Greeting timed out")
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id)
except Exception:
logger.exception("Session failed for %s", self.room_id)