fix: skip stale E2EE keys and re-fetch on persistent DEC_FAILED

- Timeline key fetch now filters by sent_ts (max 60s age) to avoid
  using keys from a previous call session
- After 3+ consecutive DEC_FAILED events, automatically re-fetches
  key from timeline in case rotation happened
- Tracks DEC_FAILED count per participant, resets on OK

This should fix the issue where the bot picks up stale encryption keys
from previous calls and can't decrypt the current caller's audio.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-03-10 13:38:10 +02:00
parent a155f39ede
commit 3706f568b6

View File

@@ -541,6 +541,7 @@ class VoiceSession:
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
now_ms = int(time.time() * 1000)
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
@@ -549,8 +550,14 @@ class VoiceSession:
continue # skip our own key
content = evt.get("content", {})
device = content.get("device_id", "")
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
sender, device)
# Only accept keys from this call session (sent within last 60s)
sent_ts = content.get("sent_ts", evt.get("origin_server_ts", 0))
age_s = (now_ms - sent_ts) / 1000 if sent_ts else 999
logger.info("Found encryption_keys timeline event: sender=%s device=%s age=%.0fs",
sender, device, age_s)
if age_s > 60:
logger.info("Skipping stale encryption_keys event (%.0fs old)", age_s)
continue
all_keys = {}
import base64 as b64
for k in content.get("keys", []):
@@ -565,8 +572,9 @@ class VoiceSession:
self._caller_identity = f"{sender}:{device}"
self._caller_all_keys.update(all_keys)
max_idx = max(all_keys.keys())
logger.info("Loaded caller keys at indices %s (using %d)",
sorted(all_keys.keys()), max_idx)
logger.info("Loaded caller keys at indices %s (using %d, key=%s)",
sorted(all_keys.keys()), max_idx,
all_keys[max_idx].hex()[:8])
return all_keys[max_idx]
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
@@ -711,16 +719,22 @@ class VoiceSession:
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
_last_rekey_time = {} # per-participant cooldown for DEC_FAILED re-keying
_dec_failed_count = {} # consecutive DEC_FAILED per participant
_refetch_in_progress = False
@self.lk_room.on("e2ee_state_changed")
def on_e2ee_state(participant, state):
nonlocal _refetch_in_progress
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
p_id = participant.identity if participant else "local"
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
if int(state) == 1: # OK — reset failure counter
_dec_failed_count.pop(p_id, None)
# When remote participant needs key: NEW, MISSING_KEY, or DEC_FAILED (with cooldown)
if participant and p_id != bot_identity and int(state) in (0, 3, 4):
now = time.monotonic()
# DEC_FAILED: only re-key every 5s to avoid tight loops
if int(state) == 3:
_dec_failed_count[p_id] = _dec_failed_count.get(p_id, 0) + 1
last = _last_rekey_time.get(p_id, 0)
if (now - last) < 5.0:
return
@@ -734,6 +748,32 @@ class VoiceSession:
idx, p_id, state_name)
except Exception as exc:
logger.warning("e2ee_state set_key failed: %s", exc)
# After 3+ DEC_FAILED: re-fetch key from timeline (might have rotated)
if _dec_failed_count.get(p_id, 0) >= 3 and not _refetch_in_progress:
_refetch_in_progress = True
async def _refetch_key():
nonlocal _refetch_in_progress
try:
logger.info("DEC_FAILED x%d — re-fetching key from timeline",
_dec_failed_count.get(p_id, 0))
new_key = await self._fetch_encryption_key_http()
if new_key and new_key != self._caller_key:
logger.info("Got NEW key from timeline re-fetch (%s)",
new_key.hex()[:8])
self._caller_key = new_key
kp_r = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp_r, p_id, base_k, idx)
_dec_failed_count[p_id] = 0
elif new_key:
logger.info("Re-fetch returned same key — no rotation")
else:
logger.info("Re-fetch returned no fresh key")
except Exception as exc:
logger.warning("Key re-fetch failed: %s", exc)
finally:
_refetch_in_progress = False
asyncio.ensure_future(_refetch_key())
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=HKDF), remote=%d",