diff --git a/voice.py b/voice.py index 533d205..ee24095 100644 --- a/voice.py +++ b/voice.py @@ -68,6 +68,8 @@ _STT_ARTIFACT_PATTERNS = [ re.compile(r'(?i)^musik\s*$'), # bare "Musik" annotation re.compile(r'(?i)^\(.*\)\s*$'), # parenthetical annotations like (Applaus) re.compile(r'(?i)^\[.*\]\s*$'), # bracketed annotations like [Musik] + re.compile(r'(?i)^(background noise|static|silence|applause|laughter|noise)\s*$'), # English variants + re.compile(r'^\s*\*\*[^*]+\*\*\s*$'), # double-asterisk: **Noise** ] @@ -289,33 +291,36 @@ class VoiceSession: self._caller_user_id = caller_user_id # Matrix user ID for memory lookup def on_encryption_key(self, sender, device_id, key, index): - """Receive E2EE key from Element Call participant.""" + """Receive E2EE key from Element Call participant. + + Store-only: keys are applied in on_track_subscribed() / on_e2ee_state_changed() + where the frame cryptor is guaranteed to exist. If the track is already + subscribed (late key arrival / rotation), set the key immediately. + """ if not key: return if not self._caller_key: self._caller_key = key self._caller_identity = f"{sender}:{device_id}" - # Store only the real key at its received index (no pre-ratcheting). - # EC distributes keys explicitly via Matrix — no HMAC ratcheting. self._caller_all_keys[index] = key logger.info("E2EE key received from %s:%s (index=%d, %d bytes, raw=%s)", sender, device_id, index, len(key), key.hex()) - # Diagnostic: compute Python HKDF to compare with what Rust FFI should derive - derived = _hkdf_derive(key) - logger.info("E2EE_DIAG: Python HKDF(raw_key) = %s (expected AES-128-GCM key)", derived.hex()) - # Live-update key on rotation — use set_key() which applies HKDF via Rust FFI. - # At this point the track is usually already subscribed so frame cryptor exists. + # Late key arrival: if track already subscribed, frame cryptor exists — set key now. if self.lk_room and self._caller_identity: - caller_lk_id = self._caller_identity # e.g. "@user:server.eu:DEVICEID" + caller_lk_id = self._caller_identity for p in self.lk_room.remote_participants.values(): if p.identity == caller_lk_id: - try: - kp = self.lk_room.e2ee_manager.key_provider - ok = kp.set_key(p.identity, key, index) - logger.info("Live-updated set_key[%d] for %s (ok=%s, %d bytes)", - index, p.identity, ok, len(key)) - except Exception as e: - logger.warning("Failed to live-update caller key: %s", e) + has_subscribed = any( + pub.subscribed for pub in p.track_publications.values() + ) + if has_subscribed: + try: + kp = self.lk_room.e2ee_manager.key_provider + ok = kp.set_key(p.identity, key, index) + logger.info("Late key set_key[%d] for %s (ok=%s)", + index, p.identity, ok) + except Exception as e: + logger.warning("Late key set_key failed: %s", e) break async def _fetch_encryption_key_http(self) -> bytes | None: @@ -471,21 +476,22 @@ class VoiceSession: logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes, raw=%s)", idx, caller_id, ok, len(base_k), base_k.hex()) else: - logger.warning("on_ts: no caller keys available yet") + logger.warning("on_ts: no caller keys yet — scheduling 0.5s retry") + async def _brief_key_retry(pid=caller_id): + await asyncio.sleep(0.5) + if self._caller_all_keys: + try: + kp_r = self.lk_room.e2ee_manager.key_provider + for idx, base_k in sorted(self._caller_all_keys.items()): + ok = kp_r.set_key(pid, base_k, idx) + logger.info("on_ts_retry: set_key[%d] for %s (ok=%s)", idx, pid, ok) + except Exception as exc: + logger.warning("on_ts_retry: set_key failed: %s", exc) + else: + logger.warning("on_ts_retry: still no caller keys for %s", pid) + asyncio.ensure_future(_brief_key_retry()) except Exception as exc: logger.warning("on_ts: set_key failed: %s", exc) - # Delayed retry with additional diagnostics - async def _delayed_set_key(pid=caller_id): - await asyncio.sleep(1.5) - try: - kp_d = self.lk_room.e2ee_manager.key_provider - if self._caller_all_keys: - for idx, base_k in sorted(self._caller_all_keys.items()): - ok = kp_d.set_key(pid, base_k, idx) - logger.info("on_ts_delayed: set_key[%d] for %s (ok=%s)", idx, pid, ok) - except Exception as exc: - logger.warning("on_ts_delayed: set_key failed: %s", exc) - asyncio.ensure_future(_delayed_set_key()) _e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"} @self.lk_room.on("e2ee_state_changed") @@ -494,7 +500,7 @@ class VoiceSession: p_id = participant.identity if participant else "local" logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name) # When remote participant frame cryptor is NEW or MISSING_KEY → set their key - if participant and p_id != bot_identity and int(state) in (0, 4): + if participant and p_id != bot_identity and int(state) in (0, 3, 4): if self._caller_all_keys: try: kp_e = self.lk_room.e2ee_manager.key_provider @@ -515,43 +521,12 @@ class VoiceSession: ok = kp.set_key(bot_identity, self._bot_key, 0) logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key)) - # Element Call rotates its encryption key when bot joins the LiveKit room. - # EC sends the new key via Matrix (Megolm-encrypted); nio sync will decrypt it - # and call on_encryption_key(), which updates self._caller_all_keys. - # NOTE: HTTP fetch is useless here — keys are Matrix-E2EE encrypted (m.room.encrypted). - pre_max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1 - pre_key_count = len(self._caller_all_keys) - logger.info("Waiting for EC key rotation via nio sync (current max_idx=%d, keys=%d)...", - pre_max_idx, pre_key_count) - for _attempt in range(20): # up to 10s (20 × 0.5s) - await asyncio.sleep(0.5) - new_max = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1 - new_count = len(self._caller_all_keys) - # Detect rotation: new index OR new key bytes at existing index - if new_max > pre_max_idx or new_count > pre_key_count: - self._caller_key = self._caller_all_keys[new_max] - logger.info("Key rotated: index %d→%d, count %d→%d (%d bytes, raw=%s)", - pre_max_idx, new_max, pre_key_count, new_count, - len(self._caller_key), self._caller_key.hex()) - # Re-set all keys on the frame cryptor - if self._caller_identity: - for p in self.lk_room.remote_participants.values(): - if p.identity == self._caller_identity: - try: - kp_rot = self.lk_room.e2ee_manager.key_provider - for idx, k in sorted(self._caller_all_keys.items()): - kp_rot.set_key(p.identity, k, idx) - logger.info("Rotation set_key[%d] for %s (%d bytes, raw=%s)", - idx, p.identity, len(k), k.hex()) - except Exception as e: - logger.warning("Rotation set_key failed: %s", e) - break - break - if _attempt % 4 == 3: # log every 2s - logger.info("Key rotation wait %0.1fs: max_idx still %d, keys=%d", - (_attempt + 1) * 0.5, new_max, new_count) - else: - logger.warning("No key rotation after 10s — using pre-join key[%d]", pre_max_idx) + # Element Call may rotate keys when bot joins — handled asynchronously by + # on_encryption_key() (stores keys) + on_track_subscribed() / on_e2ee_state_changed() + # (applies keys when frame cryptor exists). No blocking wait needed. + logger.info("E2EE key state: %d keys stored (max_idx=%d)", + len(self._caller_all_keys), + max(self._caller_all_keys.keys()) if self._caller_all_keys else -1) # Find the remote participant, wait up to 10s if not yet connected remote_identity = None @@ -644,7 +619,11 @@ class VoiceSession: @self.session.on("user_input_transcribed") def _on_user_speech(ev): - logger.info("USER_SPEECH: %s", ev.transcript) + text = ev.transcript or "" + if text and _is_stt_artifact(text): + logger.warning("NOISE_LEAK: artifact reached STT: %s", text) + else: + logger.info("USER_SPEECH: %s", text) if ev.transcript: _last_user_speech.append(ev.transcript) @@ -698,7 +677,7 @@ class VoiceSession: except asyncio.TimeoutError: logger.error("Greeting timed out") - # VAD watchdog: log diagnostic if user_state stays "away" for >30s + # VAD watchdog: log diagnostic and attempt E2EE key recovery if stuck import time as _time while True: await asyncio.sleep(10) @@ -712,9 +691,20 @@ class VoiceSession: n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0 logger.warning( "VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, " - "remote_participants=%d, e2ee_ok=%s) — possible MAT-40 occurrence", + "remote_participants=%d, e2ee_ok=%s) — attempting E2EE key recovery", sc, n_remote, e2ee_ok, ) + # Recovery: re-apply all stored E2EE keys for all remote participants + if self.lk_room and self._caller_all_keys: + try: + kp_w = self.lk_room.e2ee_manager.key_provider + for p in self.lk_room.remote_participants.values(): + for idx, base_k in sorted(self._caller_all_keys.items()): + ok = kp_w.set_key(p.identity, base_k, idx) + logger.info("VAD_WATCHDOG: recovery set_key[%d] for %s (ok=%s)", + idx, p.identity, ok) + except Exception as exc: + logger.warning("VAD_WATCHDOG: recovery set_key failed: %s", exc) _vad_state_log["away_since"] = None # only warn once per stuck period except asyncio.CancelledError: