diff --git a/bot.py b/bot.py index ab0de19..932a585 100644 --- a/bot.py +++ b/bot.py @@ -444,6 +444,16 @@ class Bot: try: model = self.room_models.get(room_id, DEFAULT_MODEL) caller_device_id = content.get("device_id", "") + + # Generate our E2EE key and publish it as a timeline event FIRST. + # Element Call only shares its key after seeing ours. + import secrets + our_key = secrets.token_bytes(32) + await self._publish_encryption_key(room_id, our_key) + + # Now check timeline for caller's key (they may have published before us) + caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id) + vs = VoiceSession( nio_client=self.client, room_id=room_id, @@ -451,20 +461,14 @@ class Bot: lk_url=LK_URL, model=model, ) - # Try reading encryption key from room state - caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id) if caller_key: vs.on_encryption_key(event.sender, caller_device_id, caller_key, 0) - # Store BEFORE start so on_unknown handler can forward keys + # Store BEFORE start so on_unknown handler can forward keys via sync self.voice_sessions[room_id] = vs await vs.start() logger.info("Voice session started for room %s (e2ee_key=%s)", - room_id, "yes" if caller_key else "no") - - # Publish our E2EE key so Element Call sees us as encrypted - if caller_key: - await self._publish_encryption_key(room_id, caller_key) + room_id, "yes" if caller_key else "waiting for sync") except Exception: logger.exception("Voice session start failed for %s", room_id) self.voice_sessions.pop(room_id, None) @@ -1441,90 +1445,68 @@ class Bot: ) async def _get_call_encryption_key(self, room_id: str, sender: str, caller_device_id: str = "") -> bytes | None: - """Read E2EE encryption key from io.element.call.encryption_keys state events. + """Read E2EE encryption key from room timeline messages. - Element Call uses state_key format: @user:domain:DEVICE_ID - Falls back to trying just @user:domain and scanning all room state. + Element Call sends encryption keys as timeline events (NOT state events). + We scan recent room messages for io.element.call.encryption_keys events. """ - # Try state_key formats: @user:domain:device_id, then @user:domain - state_keys_to_try = [] - if caller_device_id: - state_keys_to_try.append(f"{sender}:{caller_device_id}") - state_keys_to_try.append(sender) - - for state_key in state_keys_to_try: - try: - resp = await self.client.room_get_state_event( - room_id, ENCRYPTION_KEYS_TYPE, state_key, - ) - logger.info("E2EE key lookup state_key=%s → resp type=%s", state_key, type(resp).__name__) - key = self._extract_e2ee_key(resp, sender, state_key) - if key: - return key - except Exception as e: - logger.info("E2EE key lookup state_key=%s failed: %s", state_key, e) - - # Fallback: scan all room state via HTTP API for any encryption_keys event try: import httpx token = self.client.access_token - url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/state" + # Fetch recent messages from timeline (where Element Call puts encryption keys) + url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/messages" + params = {"dir": "b", "limit": "50"} async with httpx.AsyncClient(timeout=10.0) as http: - resp = await http.get(url, headers={"Authorization": f"Bearer {token}"}) + resp = await http.get(url, headers={"Authorization": f"Bearer {token}"}, params=params) resp.raise_for_status() - events = resp.json() - logger.info("Room state scan: %d events total", len(events)) + data = resp.json() + events = data.get("chunk", []) + logger.info("Timeline scan: %d events", len(events)) for evt in events: evt_type = evt.get("type", "") - if "call" in evt_type or "encryption" in evt_type: - logger.info(" state event: type=%s state_key=%s content_keys=%s", - evt_type, evt.get("state_key", ""), list(evt.get("content", {}).keys())[:5]) - if evt_type == ENCRYPTION_KEYS_TYPE and evt.get("sender") != BOT_USER: + if evt_type == ENCRYPTION_KEYS_TYPE: + evt_sender = evt.get("sender", "") + if evt_sender == BOT_USER: + continue # skip our own keys content = evt.get("content", {}) - keys = content.get("keys", []) - for k in keys: + logger.info("Found encryption_keys timeline event from %s: %s", + evt_sender, list(content.keys())) + for k in content.get("keys", []): key_b64 = k.get("key", "") if key_b64: key_b64 += "=" * (-len(key_b64) % 4) key = base64.urlsafe_b64decode(key_b64) - logger.info("Got E2EE key from room state scan (%s, %d bytes)", - evt.get("state_key", "?"), len(key)) + logger.info("Got E2EE key from timeline (%s, %d bytes)", + evt_sender, len(key)) return key + # Log event types for debugging + types = [e.get("type", "") for e in events] + logger.info("Timeline event types: %s", types) except Exception as e: - logger.warning("Room state scan for encryption keys failed: %s", e) + logger.warning("Timeline scan for encryption keys failed: %s", e) - logger.warning("No E2EE encryption key found for %s in %s", sender, room_id) - return None - - @staticmethod - def _extract_e2ee_key(resp, sender: str, state_key: str) -> bytes | None: - """Extract E2EE key bytes from a state event response.""" - if not hasattr(resp, "content") or not resp.content: - return None - keys = resp.content.get("keys", []) - for k in keys: - key_b64 = k.get("key", "") - if key_b64: - key_b64 += "=" * (-len(key_b64) % 4) - key = base64.urlsafe_b64decode(key_b64) - logger.info("Got E2EE key from %s (state_key=%s, %d bytes)", sender, state_key, len(key)) - return key + logger.info("No E2EE encryption key found in timeline for %s in %s", sender, room_id) return None async def _publish_encryption_key(self, room_id: str, key: bytes): - """Publish bot's E2EE encryption key as io.element.call.encryption_keys state event.""" + """Publish bot's E2EE encryption key as a timeline event (NOT state). + + Element Call distributes encryption keys as timeline events via + io.element.call.encryption_keys, not as state events. + """ key_b64 = base64.urlsafe_b64encode(key).decode().rstrip("=") content = { "call_id": "", "device_id": BOT_DEVICE_ID, "keys": [{"index": 0, "key": key_b64}], } - state_key = f"{BOT_USER}:{BOT_DEVICE_ID}" try: - await self.client.room_put_state( - room_id, ENCRYPTION_KEYS_TYPE, content, state_key=state_key, + await self.client.room_send( + room_id, + message_type=ENCRYPTION_KEYS_TYPE, + content=content, ) - logger.info("Published E2EE key in %s", room_id) + logger.info("Published E2EE key as timeline event in %s", room_id) except Exception: logger.exception("Failed to publish E2EE key in %s", room_id) diff --git a/voice.py b/voice.py index 88f4021..ed264ed 100644 --- a/voice.py +++ b/voice.py @@ -103,60 +103,45 @@ class VoiceSession: sender, device_id, index, len(key)) async def _fetch_encryption_key_http(self) -> bytes | None: - """Fetch encryption key from room state via Matrix HTTP API.""" + """Fetch encryption key from room timeline (NOT state) via Matrix HTTP API. + + Element Call distributes encryption keys as timeline events, not state. + """ import httpx homeserver = str(self.nio_client.homeserver) token = self.nio_client.access_token - url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/state" + url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages" try: async with httpx.AsyncClient(timeout=10.0) as http: - resp = await http.get(url, headers={"Authorization": f"Bearer {token}"}) + resp = await http.get( + url, + headers={"Authorization": f"Bearer {token}"}, + params={"dir": "b", "limit": "50"}, + ) resp.raise_for_status() - events = resp.json() + data = resp.json() + events = data.get("chunk", []) + user_id = self.nio_client.user_id for evt in events: evt_type = evt.get("type", "") if evt_type == "io.element.call.encryption_keys": sender = evt.get("sender", "") - user_id = self.nio_client.user_id if sender == user_id: continue # skip our own key content = evt.get("content", {}) - state_key = evt.get("state_key", "") - logger.info("Found encryption_keys event: sender=%s state_key=%s content=%s", - sender, state_key, content) + logger.info("Found encryption_keys timeline event: sender=%s content=%s", + sender, content) for k in content.get("keys", []): key_b64 = k.get("key", "") if key_b64: key_b64 += "=" * (-len(key_b64) % 4) import base64 as b64 return b64.urlsafe_b64decode(key_b64) - # Log all state event types for debugging - types = [e.get("type", "") for e in events] - logger.info("Room state event types: %s", types) + logger.info("No encryption_keys events in last %d timeline events", len(events)) except Exception as e: logger.warning("HTTP encryption key fetch failed: %s", e) return None - async def _publish_e2ee_key(self, key: bytes): - """Publish our E2EE key to room state so Element Call shares its key with us.""" - import base64 as b64 - key_b64 = b64.urlsafe_b64encode(key).decode().rstrip("=") - content = { - "call_id": "", - "device_id": self.device_id, - "keys": [{"index": 0, "key": key_b64}], - } - user_id = self.nio_client.user_id - state_key = f"{user_id}:{self.device_id}" - try: - ENCRYPTION_KEYS_TYPE = "io.element.call.encryption_keys" - await self.nio_client.room_put_state( - self.room_id, ENCRYPTION_KEYS_TYPE, content, state_key=state_key, - ) - logger.info("Published E2EE key (state_key=%s)", state_key) - except Exception: - logger.exception("Failed to publish E2EE key") - async def start(self): self._task = asyncio.create_task(self._run()) @@ -184,30 +169,31 @@ class VoiceSession: user_id = self.nio_client.user_id jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id) - # Actively fetch encryption key from room state via HTTP API - # Element Call publishes keys as state events during active calls + # Check timeline for caller's encryption key caller_key = await self._fetch_encryption_key_http() - if caller_key: self._e2ee_key = caller_key - logger.info("Got caller E2EE key via HTTP (%d bytes)", len(caller_key)) - else: - # Wait up to 10s for key via sync handler - logger.info("No key in room state yet, waiting for sync...") - for _ in range(100): + logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key)) + + if not self._e2ee_key: + # Wait up to 15s for key via sync handler (bot.py forwards + # encryption_keys timeline events to on_encryption_key) + logger.info("No key in timeline yet, waiting for sync...") + for _ in range(150): if self._e2ee_key: break await asyncio.sleep(0.1) if self._e2ee_key: shared_key = self._e2ee_key - logger.info("Using caller's E2EE key (%d bytes)", len(shared_key)) - # Publish the SAME key so Element Call sees us as encrypted - await self._publish_e2ee_key(shared_key) + logger.info("Using E2EE key (%d bytes)", len(shared_key)) e2ee_opts = _build_e2ee_options(shared_key) else: - logger.warning("No E2EE key available — connecting WITHOUT encryption") - e2ee_opts = None + # Generate our own key as fallback — bot.py already published one + import secrets + shared_key = secrets.token_bytes(32) + logger.warning("No caller E2EE key received — using generated key") + e2ee_opts = _build_e2ee_options(shared_key) room_opts = rtc.RoomOptions(e2ee=e2ee_opts) self.lk_room = rtc.Room()