diff --git a/bot.py b/bot.py index f49a1e8..135f3fc 100644 --- a/bot.py +++ b/bot.py @@ -266,6 +266,7 @@ class Bot: self.lkapi = None self.voice_sessions: dict[str, VoiceSession] = {} self.active_calls = set() # rooms where we've sent call member event + self.active_callers: dict[str, set[str]] = {} # room_id → set of caller user IDs self.rag = DocumentRAG(WILDFILES_BASE_URL, WILDFILES_ORG) self.memory = MemoryClient(MEMORY_SERVICE_URL) self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None @@ -401,6 +402,7 @@ class Bot: logger.info("Call detected in %s from %s, joining...", room_id, event.sender) self.active_calls.add(room_id) + self.active_callers.setdefault(room_id, set()).add(event.sender) # Get the foci_preferred from the caller's event content = event.source["content"] @@ -478,10 +480,20 @@ class Bot: self.voice_sessions.pop(room_id, None) else: - # Empty content = someone left the call, check if anyone is still calling + # Empty content = someone left the call room_id = room.room_id if room_id in self.active_calls: - # Stop voice session + # Remove this caller from active set + callers = self.active_callers.get(room_id, set()) + callers.discard(event.sender) + if callers: + logger.info("Participant %s left %s but %d other(s) still in call — keeping session", + event.sender, room_id, len(callers)) + return + + # No callers left — stop voice session + logger.info("Last caller %s left %s — stopping session", event.sender, room_id) + self.active_callers.pop(room_id, None) vs = self.voice_sessions.pop(room_id, None) if vs: try: diff --git a/voice.py b/voice.py index c68eed8..e5ea5a5 100644 --- a/voice.py +++ b/voice.py @@ -96,6 +96,22 @@ def _hkdf_derive(ikm: bytes) -> bytes: return t1[:16] +def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]: + """Pre-compute ratcheted keys 0..count-1 matching livekit-client-sdk-js ratchet(). + + EC JS ratchet: new_raw = HMAC(key=current_raw, data="LKFrameEncryptionKey")[:16] + Returns {index: raw_key} for all indices 0..count-1. + Set these via set_key(identity, raw, index) with KDF_HKDF=1 so Rust applies HKDF. + """ + import hmac as _hmac + keys = {} + raw = base_raw + for i in range(count): + keys[i] = raw + raw = _hmac.new(raw, b"LKFrameEncryptionKey", hashlib.sha256).digest()[:16] + return keys + + def _build_e2ee_options() -> rtc.E2EEOptions: """Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1). @@ -141,9 +157,18 @@ class VoiceSession: if not self._caller_key: self._caller_key = key self._caller_identity = f"{sender}:{device_id}" - self._caller_all_keys[index] = key - logger.info("E2EE key received from %s:%s (index=%d, %d bytes)", - sender, device_id, index, len(key)) + # Pre-compute ratcheted keys from this base key to cover EC's key rotation. + # EC rotates (via HMAC ratchet) when new participants join — the rotated key + # arrives via Matrix sync with ~30s delay. Setting ratcheted indices 0..N + # proactively means decryption works immediately without waiting for Matrix. + ratcheted = _ratchet_keys(key, count=8) + for ridx, rkey in ratcheted.items(): + actual_idx = index + ridx + if actual_idx not in self._caller_all_keys: # don't overwrite real received keys + self._caller_all_keys[actual_idx] = rkey + self._caller_all_keys[index] = key # always store the real key at its index + logger.info("E2EE key received from %s:%s (index=%d, %d bytes) — pre-computed ratchets [%d..%d]", + sender, device_id, index, len(key), index, index + 7) # Live-update key on rotation — use set_key() which applies HKDF via Rust FFI. # At this point the track is usually already subscribed so frame cryptor exists. if self.lk_room and self._caller_identity: @@ -152,7 +177,7 @@ class VoiceSession: if p.identity == caller_lk_id: try: kp = self.lk_room.e2ee_manager.key_provider - ok = kp.set_key(p.identity, index, key) + ok = kp.set_key(p.identity, key, index) logger.info("Live-updated set_key[%d] for %s (ok=%s, %d bytes)", index, p.identity, ok, len(key)) except Exception as e: @@ -286,26 +311,63 @@ class VoiceSession: def on_tp(pub, p): logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind) + @self.lk_room.on("track_muted") + def on_mute(pub, p): + logger.info("Track MUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind) + + @self.lk_room.on("track_unmuted") + def on_unmute(pub, p): + logger.info("Track UNMUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind) + @self.lk_room.on("track_subscribed") def on_ts(t, pub, p): - logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind) + logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted) + # Start audio frame counter for this track to confirm data flows + if int(t.kind) == 1: + async def _count_frames(track=t, pid=p.identity): + try: + audio_stream = rtc.AudioStream(track) + count = 0 + async for _ in audio_stream: + count += 1 + if count == 1: + logger.info("AUDIO_FLOW: first frame from %s (E2EE OK)", pid) + elif count % 1000 == 0: + logger.info("AUDIO_FLOW: %d frames from %s", count, pid) + except Exception as exc: + logger.warning("AUDIO_FLOW monitor error: %s", exc) + asyncio.ensure_future(_count_frames()) # *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor # for this participant already exists. The frame cryptor is created at track # subscription time. Calling set_key() BEFORE track subscription (at connect) # skips HKDF derivation → raw key stored → DEC_FAILED. # Solution: set caller key HERE, after frame cryptor is initialized. if int(t.kind) == 1 and e2ee_opts is not None: # audio track only + caller_id = p.identity try: kp_local = self.lk_room.e2ee_manager.key_provider if self._caller_all_keys: for idx, base_k in sorted(self._caller_all_keys.items()): - ok = kp_local.set_key(p.identity, idx, base_k) + ok = kp_local.set_key(caller_id, base_k, idx) logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes)", - idx, p.identity, ok, len(base_k)) + idx, caller_id, ok, len(base_k)) else: - logger.warning("on_ts: no caller keys available yet — will set on key receipt") + logger.warning("on_ts: no caller keys available yet") except Exception as exc: logger.warning("on_ts: set_key failed: %s", exc) + # Delayed retry — frame cryptor may not be ready at track_subscribed time + async def _delayed_set_key(pid=caller_id): + await asyncio.sleep(1.0) + try: + kp_d = self.lk_room.e2ee_manager.key_provider + if self._caller_all_keys: + for idx, base_k in sorted(self._caller_all_keys.items()): + ok = kp_d.set_key(pid, base_k, idx) + logger.info("on_ts_delayed: set_key[%d] for %s (ok=%s, %d bytes)", + idx, pid, ok, len(base_k)) + except Exception as exc: + logger.warning("on_ts_delayed: set_key failed: %s", exc) + asyncio.ensure_future(_delayed_set_key()) _e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"} @self.lk_room.on("e2ee_state_changed") @@ -313,6 +375,16 @@ class VoiceSession: state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}") p_id = participant.identity if participant else "local" logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name) + # When remote participant frame cryptor is NEW or MISSING_KEY → set their key + if participant and p_id != bot_identity and int(state) in (0, 4): + if self._caller_all_keys: + try: + kp_e = self.lk_room.e2ee_manager.key_provider + for idx, base_k in sorted(self._caller_all_keys.items()): + ok = kp_e.set_key(p_id, base_k, idx) + logger.info("e2ee_state set_key[%d] for %s (ok=%s)", idx, p_id, ok) + except Exception as exc: + logger.warning("e2ee_state set_key failed: %s", exc) await self.lk_room.connect(self.lk_url, jwt, options=room_opts) logger.info("Connected (E2EE=HKDF), remote=%d", @@ -322,7 +394,7 @@ class VoiceSession: # CALLER keys are set in on_track_subscribed (NOT here) because the caller's frame cryptor # is only created when their track arrives. Calling set_key() before that skips HKDF. kp = self.lk_room.e2ee_manager.key_provider - ok = kp.set_key(bot_identity, 0, self._bot_key) + ok = kp.set_key(bot_identity, self._bot_key, 0) logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key)) # Element Call rotates its encryption key when bot joins the LiveKit room. @@ -368,7 +440,7 @@ class VoiceSession: if self._caller_all_keys and remote_identity: try: for idx, base_k in sorted(self._caller_all_keys.items()): - ok = kp.set_key(remote_identity, idx, base_k) + ok = kp.set_key(remote_identity, base_k, idx) logger.info("Post-rotation set_key[%d] for %s (ok=%s)", idx, remote_identity, ok) except Exception as e: @@ -391,22 +463,21 @@ class VoiceSession: vad=_get_vad(), ) - # Debug: log speech pipeline events to pinpoint where audio is lost - @self.session.on("user_started_speaking") - def _on_speaking_start(): - logger.info("VAD: user_started_speaking") + # Pipeline event logging (livekit-agents 1.4.2 event names) + @self.session.on("user_state_changed") + def _on_user_state(ev): + logger.info("VAD: user_state=%s", ev.new_state) - @self.session.on("user_stopped_speaking") - def _on_speaking_stop(): - logger.info("VAD: user_stopped_speaking") + @self.session.on("user_input_transcribed") + def _on_user_speech(ev): + logger.info("USER_SPEECH: %s", ev.transcript) - @self.session.on("user_speech_committed") - def _on_user_speech(msg): - logger.info("USER_SPEECH: %s", msg.text_content) - - @self.session.on("agent_speech_committed") - def _on_agent_speech(msg): - logger.info("AGENT_SPEECH: %s", msg.text_content) + @self.session.on("conversation_item_added") + def _on_conversation_item(ev): + role = getattr(ev.item, "role", "?") + text = getattr(ev.item, "text_content", "") or "" + if role == "assistant" and text: + logger.info("AGENT_SPEECH: %s", text) agent = Agent(instructions=_build_voice_prompt()) io_opts = room_io.RoomOptions(