fix(e2ee): consolidate key timing + noise filtering (MAT-40, MAT-41)
- set_key() only called after frame cryptor exists (on_track_subscribed / late arrival) - Remove 10s blocking key rotation wait; keys applied asynchronously - Add DEC_FAILED (state 3) to e2ee_state recovery triggers - VAD watchdog re-applies all E2EE keys on >30s stuck as recovery - Expand STT artifact patterns (English variants, double-asterisk) - Add NOISE_LEAK diagnostic logging at STT level Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
130
voice.py
130
voice.py
@@ -68,6 +68,8 @@ _STT_ARTIFACT_PATTERNS = [
|
|||||||
re.compile(r'(?i)^musik\s*$'), # bare "Musik" annotation
|
re.compile(r'(?i)^musik\s*$'), # bare "Musik" annotation
|
||||||
re.compile(r'(?i)^\(.*\)\s*$'), # parenthetical annotations like (Applaus)
|
re.compile(r'(?i)^\(.*\)\s*$'), # parenthetical annotations like (Applaus)
|
||||||
re.compile(r'(?i)^\[.*\]\s*$'), # bracketed annotations like [Musik]
|
re.compile(r'(?i)^\[.*\]\s*$'), # bracketed annotations like [Musik]
|
||||||
|
re.compile(r'(?i)^(background noise|static|silence|applause|laughter|noise)\s*$'), # English variants
|
||||||
|
re.compile(r'^\s*\*\*[^*]+\*\*\s*$'), # double-asterisk: **Noise**
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@@ -289,33 +291,36 @@ class VoiceSession:
|
|||||||
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
|
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
|
||||||
|
|
||||||
def on_encryption_key(self, sender, device_id, key, index):
|
def on_encryption_key(self, sender, device_id, key, index):
|
||||||
"""Receive E2EE key from Element Call participant."""
|
"""Receive E2EE key from Element Call participant.
|
||||||
|
|
||||||
|
Store-only: keys are applied in on_track_subscribed() / on_e2ee_state_changed()
|
||||||
|
where the frame cryptor is guaranteed to exist. If the track is already
|
||||||
|
subscribed (late key arrival / rotation), set the key immediately.
|
||||||
|
"""
|
||||||
if not key:
|
if not key:
|
||||||
return
|
return
|
||||||
if not self._caller_key:
|
if not self._caller_key:
|
||||||
self._caller_key = key
|
self._caller_key = key
|
||||||
self._caller_identity = f"{sender}:{device_id}"
|
self._caller_identity = f"{sender}:{device_id}"
|
||||||
# Store only the real key at its received index (no pre-ratcheting).
|
|
||||||
# EC distributes keys explicitly via Matrix — no HMAC ratcheting.
|
|
||||||
self._caller_all_keys[index] = key
|
self._caller_all_keys[index] = key
|
||||||
logger.info("E2EE key received from %s:%s (index=%d, %d bytes, raw=%s)",
|
logger.info("E2EE key received from %s:%s (index=%d, %d bytes, raw=%s)",
|
||||||
sender, device_id, index, len(key), key.hex())
|
sender, device_id, index, len(key), key.hex())
|
||||||
# Diagnostic: compute Python HKDF to compare with what Rust FFI should derive
|
# Late key arrival: if track already subscribed, frame cryptor exists — set key now.
|
||||||
derived = _hkdf_derive(key)
|
|
||||||
logger.info("E2EE_DIAG: Python HKDF(raw_key) = %s (expected AES-128-GCM key)", derived.hex())
|
|
||||||
# Live-update key on rotation — use set_key() which applies HKDF via Rust FFI.
|
|
||||||
# At this point the track is usually already subscribed so frame cryptor exists.
|
|
||||||
if self.lk_room and self._caller_identity:
|
if self.lk_room and self._caller_identity:
|
||||||
caller_lk_id = self._caller_identity # e.g. "@user:server.eu:DEVICEID"
|
caller_lk_id = self._caller_identity
|
||||||
for p in self.lk_room.remote_participants.values():
|
for p in self.lk_room.remote_participants.values():
|
||||||
if p.identity == caller_lk_id:
|
if p.identity == caller_lk_id:
|
||||||
try:
|
has_subscribed = any(
|
||||||
kp = self.lk_room.e2ee_manager.key_provider
|
pub.subscribed for pub in p.track_publications.values()
|
||||||
ok = kp.set_key(p.identity, key, index)
|
)
|
||||||
logger.info("Live-updated set_key[%d] for %s (ok=%s, %d bytes)",
|
if has_subscribed:
|
||||||
index, p.identity, ok, len(key))
|
try:
|
||||||
except Exception as e:
|
kp = self.lk_room.e2ee_manager.key_provider
|
||||||
logger.warning("Failed to live-update caller key: %s", e)
|
ok = kp.set_key(p.identity, key, index)
|
||||||
|
logger.info("Late key set_key[%d] for %s (ok=%s)",
|
||||||
|
index, p.identity, ok)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Late key set_key failed: %s", e)
|
||||||
break
|
break
|
||||||
|
|
||||||
async def _fetch_encryption_key_http(self) -> bytes | None:
|
async def _fetch_encryption_key_http(self) -> bytes | None:
|
||||||
@@ -471,21 +476,22 @@ class VoiceSession:
|
|||||||
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes, raw=%s)",
|
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes, raw=%s)",
|
||||||
idx, caller_id, ok, len(base_k), base_k.hex())
|
idx, caller_id, ok, len(base_k), base_k.hex())
|
||||||
else:
|
else:
|
||||||
logger.warning("on_ts: no caller keys available yet")
|
logger.warning("on_ts: no caller keys yet — scheduling 0.5s retry")
|
||||||
|
async def _brief_key_retry(pid=caller_id):
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
if self._caller_all_keys:
|
||||||
|
try:
|
||||||
|
kp_r = self.lk_room.e2ee_manager.key_provider
|
||||||
|
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||||||
|
ok = kp_r.set_key(pid, base_k, idx)
|
||||||
|
logger.info("on_ts_retry: set_key[%d] for %s (ok=%s)", idx, pid, ok)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("on_ts_retry: set_key failed: %s", exc)
|
||||||
|
else:
|
||||||
|
logger.warning("on_ts_retry: still no caller keys for %s", pid)
|
||||||
|
asyncio.ensure_future(_brief_key_retry())
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("on_ts: set_key failed: %s", exc)
|
logger.warning("on_ts: set_key failed: %s", exc)
|
||||||
# Delayed retry with additional diagnostics
|
|
||||||
async def _delayed_set_key(pid=caller_id):
|
|
||||||
await asyncio.sleep(1.5)
|
|
||||||
try:
|
|
||||||
kp_d = self.lk_room.e2ee_manager.key_provider
|
|
||||||
if self._caller_all_keys:
|
|
||||||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
|
||||||
ok = kp_d.set_key(pid, base_k, idx)
|
|
||||||
logger.info("on_ts_delayed: set_key[%d] for %s (ok=%s)", idx, pid, ok)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("on_ts_delayed: set_key failed: %s", exc)
|
|
||||||
asyncio.ensure_future(_delayed_set_key())
|
|
||||||
|
|
||||||
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
|
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
|
||||||
@self.lk_room.on("e2ee_state_changed")
|
@self.lk_room.on("e2ee_state_changed")
|
||||||
@@ -494,7 +500,7 @@ class VoiceSession:
|
|||||||
p_id = participant.identity if participant else "local"
|
p_id = participant.identity if participant else "local"
|
||||||
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
|
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
|
||||||
# When remote participant frame cryptor is NEW or MISSING_KEY → set their key
|
# When remote participant frame cryptor is NEW or MISSING_KEY → set their key
|
||||||
if participant and p_id != bot_identity and int(state) in (0, 4):
|
if participant and p_id != bot_identity and int(state) in (0, 3, 4):
|
||||||
if self._caller_all_keys:
|
if self._caller_all_keys:
|
||||||
try:
|
try:
|
||||||
kp_e = self.lk_room.e2ee_manager.key_provider
|
kp_e = self.lk_room.e2ee_manager.key_provider
|
||||||
@@ -515,43 +521,12 @@ class VoiceSession:
|
|||||||
ok = kp.set_key(bot_identity, self._bot_key, 0)
|
ok = kp.set_key(bot_identity, self._bot_key, 0)
|
||||||
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
|
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
|
||||||
|
|
||||||
# Element Call rotates its encryption key when bot joins the LiveKit room.
|
# Element Call may rotate keys when bot joins — handled asynchronously by
|
||||||
# EC sends the new key via Matrix (Megolm-encrypted); nio sync will decrypt it
|
# on_encryption_key() (stores keys) + on_track_subscribed() / on_e2ee_state_changed()
|
||||||
# and call on_encryption_key(), which updates self._caller_all_keys.
|
# (applies keys when frame cryptor exists). No blocking wait needed.
|
||||||
# NOTE: HTTP fetch is useless here — keys are Matrix-E2EE encrypted (m.room.encrypted).
|
logger.info("E2EE key state: %d keys stored (max_idx=%d)",
|
||||||
pre_max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
|
len(self._caller_all_keys),
|
||||||
pre_key_count = len(self._caller_all_keys)
|
max(self._caller_all_keys.keys()) if self._caller_all_keys else -1)
|
||||||
logger.info("Waiting for EC key rotation via nio sync (current max_idx=%d, keys=%d)...",
|
|
||||||
pre_max_idx, pre_key_count)
|
|
||||||
for _attempt in range(20): # up to 10s (20 × 0.5s)
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
new_max = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
|
|
||||||
new_count = len(self._caller_all_keys)
|
|
||||||
# Detect rotation: new index OR new key bytes at existing index
|
|
||||||
if new_max > pre_max_idx or new_count > pre_key_count:
|
|
||||||
self._caller_key = self._caller_all_keys[new_max]
|
|
||||||
logger.info("Key rotated: index %d→%d, count %d→%d (%d bytes, raw=%s)",
|
|
||||||
pre_max_idx, new_max, pre_key_count, new_count,
|
|
||||||
len(self._caller_key), self._caller_key.hex())
|
|
||||||
# Re-set all keys on the frame cryptor
|
|
||||||
if self._caller_identity:
|
|
||||||
for p in self.lk_room.remote_participants.values():
|
|
||||||
if p.identity == self._caller_identity:
|
|
||||||
try:
|
|
||||||
kp_rot = self.lk_room.e2ee_manager.key_provider
|
|
||||||
for idx, k in sorted(self._caller_all_keys.items()):
|
|
||||||
kp_rot.set_key(p.identity, k, idx)
|
|
||||||
logger.info("Rotation set_key[%d] for %s (%d bytes, raw=%s)",
|
|
||||||
idx, p.identity, len(k), k.hex())
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Rotation set_key failed: %s", e)
|
|
||||||
break
|
|
||||||
break
|
|
||||||
if _attempt % 4 == 3: # log every 2s
|
|
||||||
logger.info("Key rotation wait %0.1fs: max_idx still %d, keys=%d",
|
|
||||||
(_attempt + 1) * 0.5, new_max, new_count)
|
|
||||||
else:
|
|
||||||
logger.warning("No key rotation after 10s — using pre-join key[%d]", pre_max_idx)
|
|
||||||
|
|
||||||
# Find the remote participant, wait up to 10s if not yet connected
|
# Find the remote participant, wait up to 10s if not yet connected
|
||||||
remote_identity = None
|
remote_identity = None
|
||||||
@@ -644,7 +619,11 @@ class VoiceSession:
|
|||||||
|
|
||||||
@self.session.on("user_input_transcribed")
|
@self.session.on("user_input_transcribed")
|
||||||
def _on_user_speech(ev):
|
def _on_user_speech(ev):
|
||||||
logger.info("USER_SPEECH: %s", ev.transcript)
|
text = ev.transcript or ""
|
||||||
|
if text and _is_stt_artifact(text):
|
||||||
|
logger.warning("NOISE_LEAK: artifact reached STT: %s", text)
|
||||||
|
else:
|
||||||
|
logger.info("USER_SPEECH: %s", text)
|
||||||
if ev.transcript:
|
if ev.transcript:
|
||||||
_last_user_speech.append(ev.transcript)
|
_last_user_speech.append(ev.transcript)
|
||||||
|
|
||||||
@@ -698,7 +677,7 @@ class VoiceSession:
|
|||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.error("Greeting timed out")
|
logger.error("Greeting timed out")
|
||||||
|
|
||||||
# VAD watchdog: log diagnostic if user_state stays "away" for >30s
|
# VAD watchdog: log diagnostic and attempt E2EE key recovery if stuck
|
||||||
import time as _time
|
import time as _time
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(10)
|
await asyncio.sleep(10)
|
||||||
@@ -712,9 +691,20 @@ class VoiceSession:
|
|||||||
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
|
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
|
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
|
||||||
"remote_participants=%d, e2ee_ok=%s) — possible MAT-40 occurrence",
|
"remote_participants=%d, e2ee_ok=%s) — attempting E2EE key recovery",
|
||||||
sc, n_remote, e2ee_ok,
|
sc, n_remote, e2ee_ok,
|
||||||
)
|
)
|
||||||
|
# Recovery: re-apply all stored E2EE keys for all remote participants
|
||||||
|
if self.lk_room and self._caller_all_keys:
|
||||||
|
try:
|
||||||
|
kp_w = self.lk_room.e2ee_manager.key_provider
|
||||||
|
for p in self.lk_room.remote_participants.values():
|
||||||
|
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||||||
|
ok = kp_w.set_key(p.identity, base_k, idx)
|
||||||
|
logger.info("VAD_WATCHDOG: recovery set_key[%d] for %s (ok=%s)",
|
||||||
|
idx, p.identity, ok)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("VAD_WATCHDOG: recovery set_key failed: %s", exc)
|
||||||
_vad_state_log["away_since"] = None # only warn once per stuck period
|
_vad_state_log["away_since"] = None # only warn once per stuck period
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
|||||||
Reference in New Issue
Block a user