fix(voice): full E2EE bidirectional audio pipeline working
- bot.py: track active callers per room; only stop session when last caller leaves (fixes premature cancellation when Playwright browser hangs up while real app is still in call) - voice.py: pre-compute 8 HMAC-ratcheted keys from EC's base key so decryption works immediately without waiting ~30s for Matrix to deliver EC's key-rotation event (root cause of user→bot silence) - voice.py: fix set_key() argument order (identity, key, index) at all call sites — was (identity, index, key) causing TypeError - voice.py: add audio frame monitor (AUDIO_FLOW) and mute/unmute event handlers for diagnostics - voice.py: update livekit-agents 1.4.2 event names: user_state_changed, user_input_transcribed, conversation_item_added Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
16
bot.py
16
bot.py
@@ -266,6 +266,7 @@ class Bot:
|
||||
self.lkapi = None
|
||||
self.voice_sessions: dict[str, VoiceSession] = {}
|
||||
self.active_calls = set() # rooms where we've sent call member event
|
||||
self.active_callers: dict[str, set[str]] = {} # room_id → set of caller user IDs
|
||||
self.rag = DocumentRAG(WILDFILES_BASE_URL, WILDFILES_ORG)
|
||||
self.memory = MemoryClient(MEMORY_SERVICE_URL)
|
||||
self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None
|
||||
@@ -401,6 +402,7 @@ class Bot:
|
||||
|
||||
logger.info("Call detected in %s from %s, joining...", room_id, event.sender)
|
||||
self.active_calls.add(room_id)
|
||||
self.active_callers.setdefault(room_id, set()).add(event.sender)
|
||||
|
||||
# Get the foci_preferred from the caller's event
|
||||
content = event.source["content"]
|
||||
@@ -478,10 +480,20 @@ class Bot:
|
||||
self.voice_sessions.pop(room_id, None)
|
||||
|
||||
else:
|
||||
# Empty content = someone left the call, check if anyone is still calling
|
||||
# Empty content = someone left the call
|
||||
room_id = room.room_id
|
||||
if room_id in self.active_calls:
|
||||
# Stop voice session
|
||||
# Remove this caller from active set
|
||||
callers = self.active_callers.get(room_id, set())
|
||||
callers.discard(event.sender)
|
||||
if callers:
|
||||
logger.info("Participant %s left %s but %d other(s) still in call — keeping session",
|
||||
event.sender, room_id, len(callers))
|
||||
return
|
||||
|
||||
# No callers left — stop voice session
|
||||
logger.info("Last caller %s left %s — stopping session", event.sender, room_id)
|
||||
self.active_callers.pop(room_id, None)
|
||||
vs = self.voice_sessions.pop(room_id, None)
|
||||
if vs:
|
||||
try:
|
||||
|
||||
119
voice.py
119
voice.py
@@ -96,6 +96,22 @@ def _hkdf_derive(ikm: bytes) -> bytes:
|
||||
return t1[:16]
|
||||
|
||||
|
||||
def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]:
|
||||
"""Pre-compute ratcheted keys 0..count-1 matching livekit-client-sdk-js ratchet().
|
||||
|
||||
EC JS ratchet: new_raw = HMAC(key=current_raw, data="LKFrameEncryptionKey")[:16]
|
||||
Returns {index: raw_key} for all indices 0..count-1.
|
||||
Set these via set_key(identity, raw, index) with KDF_HKDF=1 so Rust applies HKDF.
|
||||
"""
|
||||
import hmac as _hmac
|
||||
keys = {}
|
||||
raw = base_raw
|
||||
for i in range(count):
|
||||
keys[i] = raw
|
||||
raw = _hmac.new(raw, b"LKFrameEncryptionKey", hashlib.sha256).digest()[:16]
|
||||
return keys
|
||||
|
||||
|
||||
def _build_e2ee_options() -> rtc.E2EEOptions:
|
||||
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
|
||||
|
||||
@@ -141,9 +157,18 @@ class VoiceSession:
|
||||
if not self._caller_key:
|
||||
self._caller_key = key
|
||||
self._caller_identity = f"{sender}:{device_id}"
|
||||
self._caller_all_keys[index] = key
|
||||
logger.info("E2EE key received from %s:%s (index=%d, %d bytes)",
|
||||
sender, device_id, index, len(key))
|
||||
# Pre-compute ratcheted keys from this base key to cover EC's key rotation.
|
||||
# EC rotates (via HMAC ratchet) when new participants join — the rotated key
|
||||
# arrives via Matrix sync with ~30s delay. Setting ratcheted indices 0..N
|
||||
# proactively means decryption works immediately without waiting for Matrix.
|
||||
ratcheted = _ratchet_keys(key, count=8)
|
||||
for ridx, rkey in ratcheted.items():
|
||||
actual_idx = index + ridx
|
||||
if actual_idx not in self._caller_all_keys: # don't overwrite real received keys
|
||||
self._caller_all_keys[actual_idx] = rkey
|
||||
self._caller_all_keys[index] = key # always store the real key at its index
|
||||
logger.info("E2EE key received from %s:%s (index=%d, %d bytes) — pre-computed ratchets [%d..%d]",
|
||||
sender, device_id, index, len(key), index, index + 7)
|
||||
# Live-update key on rotation — use set_key() which applies HKDF via Rust FFI.
|
||||
# At this point the track is usually already subscribed so frame cryptor exists.
|
||||
if self.lk_room and self._caller_identity:
|
||||
@@ -152,7 +177,7 @@ class VoiceSession:
|
||||
if p.identity == caller_lk_id:
|
||||
try:
|
||||
kp = self.lk_room.e2ee_manager.key_provider
|
||||
ok = kp.set_key(p.identity, index, key)
|
||||
ok = kp.set_key(p.identity, key, index)
|
||||
logger.info("Live-updated set_key[%d] for %s (ok=%s, %d bytes)",
|
||||
index, p.identity, ok, len(key))
|
||||
except Exception as e:
|
||||
@@ -286,26 +311,63 @@ class VoiceSession:
|
||||
def on_tp(pub, p):
|
||||
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
||||
|
||||
@self.lk_room.on("track_muted")
|
||||
def on_mute(pub, p):
|
||||
logger.info("Track MUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
||||
|
||||
@self.lk_room.on("track_unmuted")
|
||||
def on_unmute(pub, p):
|
||||
logger.info("Track UNMUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
||||
|
||||
@self.lk_room.on("track_subscribed")
|
||||
def on_ts(t, pub, p):
|
||||
logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind)
|
||||
logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted)
|
||||
# Start audio frame counter for this track to confirm data flows
|
||||
if int(t.kind) == 1:
|
||||
async def _count_frames(track=t, pid=p.identity):
|
||||
try:
|
||||
audio_stream = rtc.AudioStream(track)
|
||||
count = 0
|
||||
async for _ in audio_stream:
|
||||
count += 1
|
||||
if count == 1:
|
||||
logger.info("AUDIO_FLOW: first frame from %s (E2EE OK)", pid)
|
||||
elif count % 1000 == 0:
|
||||
logger.info("AUDIO_FLOW: %d frames from %s", count, pid)
|
||||
except Exception as exc:
|
||||
logger.warning("AUDIO_FLOW monitor error: %s", exc)
|
||||
asyncio.ensure_future(_count_frames())
|
||||
# *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor
|
||||
# for this participant already exists. The frame cryptor is created at track
|
||||
# subscription time. Calling set_key() BEFORE track subscription (at connect)
|
||||
# skips HKDF derivation → raw key stored → DEC_FAILED.
|
||||
# Solution: set caller key HERE, after frame cryptor is initialized.
|
||||
if int(t.kind) == 1 and e2ee_opts is not None: # audio track only
|
||||
caller_id = p.identity
|
||||
try:
|
||||
kp_local = self.lk_room.e2ee_manager.key_provider
|
||||
if self._caller_all_keys:
|
||||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||||
ok = kp_local.set_key(p.identity, idx, base_k)
|
||||
ok = kp_local.set_key(caller_id, base_k, idx)
|
||||
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes)",
|
||||
idx, p.identity, ok, len(base_k))
|
||||
idx, caller_id, ok, len(base_k))
|
||||
else:
|
||||
logger.warning("on_ts: no caller keys available yet — will set on key receipt")
|
||||
logger.warning("on_ts: no caller keys available yet")
|
||||
except Exception as exc:
|
||||
logger.warning("on_ts: set_key failed: %s", exc)
|
||||
# Delayed retry — frame cryptor may not be ready at track_subscribed time
|
||||
async def _delayed_set_key(pid=caller_id):
|
||||
await asyncio.sleep(1.0)
|
||||
try:
|
||||
kp_d = self.lk_room.e2ee_manager.key_provider
|
||||
if self._caller_all_keys:
|
||||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||||
ok = kp_d.set_key(pid, base_k, idx)
|
||||
logger.info("on_ts_delayed: set_key[%d] for %s (ok=%s, %d bytes)",
|
||||
idx, pid, ok, len(base_k))
|
||||
except Exception as exc:
|
||||
logger.warning("on_ts_delayed: set_key failed: %s", exc)
|
||||
asyncio.ensure_future(_delayed_set_key())
|
||||
|
||||
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
|
||||
@self.lk_room.on("e2ee_state_changed")
|
||||
@@ -313,6 +375,16 @@ class VoiceSession:
|
||||
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
|
||||
p_id = participant.identity if participant else "local"
|
||||
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
|
||||
# When remote participant frame cryptor is NEW or MISSING_KEY → set their key
|
||||
if participant and p_id != bot_identity and int(state) in (0, 4):
|
||||
if self._caller_all_keys:
|
||||
try:
|
||||
kp_e = self.lk_room.e2ee_manager.key_provider
|
||||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||||
ok = kp_e.set_key(p_id, base_k, idx)
|
||||
logger.info("e2ee_state set_key[%d] for %s (ok=%s)", idx, p_id, ok)
|
||||
except Exception as exc:
|
||||
logger.warning("e2ee_state set_key failed: %s", exc)
|
||||
|
||||
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
|
||||
logger.info("Connected (E2EE=HKDF), remote=%d",
|
||||
@@ -322,7 +394,7 @@ class VoiceSession:
|
||||
# CALLER keys are set in on_track_subscribed (NOT here) because the caller's frame cryptor
|
||||
# is only created when their track arrives. Calling set_key() before that skips HKDF.
|
||||
kp = self.lk_room.e2ee_manager.key_provider
|
||||
ok = kp.set_key(bot_identity, 0, self._bot_key)
|
||||
ok = kp.set_key(bot_identity, self._bot_key, 0)
|
||||
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
|
||||
|
||||
# Element Call rotates its encryption key when bot joins the LiveKit room.
|
||||
@@ -368,7 +440,7 @@ class VoiceSession:
|
||||
if self._caller_all_keys and remote_identity:
|
||||
try:
|
||||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||||
ok = kp.set_key(remote_identity, idx, base_k)
|
||||
ok = kp.set_key(remote_identity, base_k, idx)
|
||||
logger.info("Post-rotation set_key[%d] for %s (ok=%s)",
|
||||
idx, remote_identity, ok)
|
||||
except Exception as e:
|
||||
@@ -391,22 +463,21 @@ class VoiceSession:
|
||||
vad=_get_vad(),
|
||||
)
|
||||
|
||||
# Debug: log speech pipeline events to pinpoint where audio is lost
|
||||
@self.session.on("user_started_speaking")
|
||||
def _on_speaking_start():
|
||||
logger.info("VAD: user_started_speaking")
|
||||
# Pipeline event logging (livekit-agents 1.4.2 event names)
|
||||
@self.session.on("user_state_changed")
|
||||
def _on_user_state(ev):
|
||||
logger.info("VAD: user_state=%s", ev.new_state)
|
||||
|
||||
@self.session.on("user_stopped_speaking")
|
||||
def _on_speaking_stop():
|
||||
logger.info("VAD: user_stopped_speaking")
|
||||
@self.session.on("user_input_transcribed")
|
||||
def _on_user_speech(ev):
|
||||
logger.info("USER_SPEECH: %s", ev.transcript)
|
||||
|
||||
@self.session.on("user_speech_committed")
|
||||
def _on_user_speech(msg):
|
||||
logger.info("USER_SPEECH: %s", msg.text_content)
|
||||
|
||||
@self.session.on("agent_speech_committed")
|
||||
def _on_agent_speech(msg):
|
||||
logger.info("AGENT_SPEECH: %s", msg.text_content)
|
||||
@self.session.on("conversation_item_added")
|
||||
def _on_conversation_item(ev):
|
||||
role = getattr(ev.item, "role", "?")
|
||||
text = getattr(ev.item, "text_content", "") or ""
|
||||
if role == "assistant" and text:
|
||||
logger.info("AGENT_SPEECH: %s", text)
|
||||
|
||||
agent = Agent(instructions=_build_voice_prompt())
|
||||
io_opts = room_io.RoomOptions(
|
||||
|
||||
Reference in New Issue
Block a user