diff --git a/voice.py b/voice.py index 2592da5..a31ddcd 100644 --- a/voice.py +++ b/voice.py @@ -541,6 +541,7 @@ class VoiceSession: data = resp.json() events = data.get("chunk", []) user_id = self.nio_client.user_id + now_ms = int(time.time() * 1000) for evt in events: evt_type = evt.get("type", "") if evt_type == "io.element.call.encryption_keys": @@ -549,8 +550,14 @@ class VoiceSession: continue # skip our own key content = evt.get("content", {}) device = content.get("device_id", "") - logger.info("Found encryption_keys timeline event: sender=%s device=%s", - sender, device) + # Only accept keys from this call session (sent within last 60s) + sent_ts = content.get("sent_ts", evt.get("origin_server_ts", 0)) + age_s = (now_ms - sent_ts) / 1000 if sent_ts else 999 + logger.info("Found encryption_keys timeline event: sender=%s device=%s age=%.0fs", + sender, device, age_s) + if age_s > 60: + logger.info("Skipping stale encryption_keys event (%.0fs old)", age_s) + continue all_keys = {} import base64 as b64 for k in content.get("keys", []): @@ -565,8 +572,9 @@ class VoiceSession: self._caller_identity = f"{sender}:{device}" self._caller_all_keys.update(all_keys) max_idx = max(all_keys.keys()) - logger.info("Loaded caller keys at indices %s (using %d)", - sorted(all_keys.keys()), max_idx) + logger.info("Loaded caller keys at indices %s (using %d, key=%s)", + sorted(all_keys.keys()), max_idx, + all_keys[max_idx].hex()[:8]) return all_keys[max_idx] logger.info("No encryption_keys events in last %d timeline events", len(events)) except Exception as e: @@ -711,16 +719,22 @@ class VoiceSession: _e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"} _last_rekey_time = {} # per-participant cooldown for DEC_FAILED re-keying + _dec_failed_count = {} # consecutive DEC_FAILED per participant + _refetch_in_progress = False @self.lk_room.on("e2ee_state_changed") def on_e2ee_state(participant, state): + nonlocal _refetch_in_progress state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}") p_id = participant.identity if participant else "local" logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name) + if int(state) == 1: # OK — reset failure counter + _dec_failed_count.pop(p_id, None) # When remote participant needs key: NEW, MISSING_KEY, or DEC_FAILED (with cooldown) if participant and p_id != bot_identity and int(state) in (0, 3, 4): now = time.monotonic() # DEC_FAILED: only re-key every 5s to avoid tight loops if int(state) == 3: + _dec_failed_count[p_id] = _dec_failed_count.get(p_id, 0) + 1 last = _last_rekey_time.get(p_id, 0) if (now - last) < 5.0: return @@ -734,6 +748,32 @@ class VoiceSession: idx, p_id, state_name) except Exception as exc: logger.warning("e2ee_state set_key failed: %s", exc) + # After 3+ DEC_FAILED: re-fetch key from timeline (might have rotated) + if _dec_failed_count.get(p_id, 0) >= 3 and not _refetch_in_progress: + _refetch_in_progress = True + async def _refetch_key(): + nonlocal _refetch_in_progress + try: + logger.info("DEC_FAILED x%d — re-fetching key from timeline", + _dec_failed_count.get(p_id, 0)) + new_key = await self._fetch_encryption_key_http() + if new_key and new_key != self._caller_key: + logger.info("Got NEW key from timeline re-fetch (%s)", + new_key.hex()[:8]) + self._caller_key = new_key + kp_r = self.lk_room.e2ee_manager.key_provider + for idx, base_k in sorted(self._caller_all_keys.items()): + _derive_and_set_key(kp_r, p_id, base_k, idx) + _dec_failed_count[p_id] = 0 + elif new_key: + logger.info("Re-fetch returned same key — no rotation") + else: + logger.info("Re-fetch returned no fresh key") + except Exception as exc: + logger.warning("Key re-fetch failed: %s", exc) + finally: + _refetch_in_progress = False + asyncio.ensure_future(_refetch_key()) await self.lk_room.connect(self.lk_url, jwt, options=room_opts) logger.info("Connected (E2EE=HKDF), remote=%d",