fix: use timeline events for E2EE key exchange (not state events)

Element Call distributes encryption keys as timeline events, not room
state events. Changed bot to publish keys via room_send and fetch from
/messages endpoint instead of /state.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-02-21 17:28:56 +02:00
parent 4e1e372ca2
commit 6e1e9839cc
2 changed files with 76 additions and 108 deletions

110
bot.py
View File

@@ -444,6 +444,16 @@ class Bot:
try:
model = self.room_models.get(room_id, DEFAULT_MODEL)
caller_device_id = content.get("device_id", "")
# Generate our E2EE key and publish it as a timeline event FIRST.
# Element Call only shares its key after seeing ours.
import secrets
our_key = secrets.token_bytes(32)
await self._publish_encryption_key(room_id, our_key)
# Now check timeline for caller's key (they may have published before us)
caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id)
vs = VoiceSession(
nio_client=self.client,
room_id=room_id,
@@ -451,20 +461,14 @@ class Bot:
lk_url=LK_URL,
model=model,
)
# Try reading encryption key from room state
caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id)
if caller_key:
vs.on_encryption_key(event.sender, caller_device_id, caller_key, 0)
# Store BEFORE start so on_unknown handler can forward keys
# Store BEFORE start so on_unknown handler can forward keys via sync
self.voice_sessions[room_id] = vs
await vs.start()
logger.info("Voice session started for room %s (e2ee_key=%s)",
room_id, "yes" if caller_key else "no")
# Publish our E2EE key so Element Call sees us as encrypted
if caller_key:
await self._publish_encryption_key(room_id, caller_key)
room_id, "yes" if caller_key else "waiting for sync")
except Exception:
logger.exception("Voice session start failed for %s", room_id)
self.voice_sessions.pop(room_id, None)
@@ -1441,90 +1445,68 @@ class Bot:
)
async def _get_call_encryption_key(self, room_id: str, sender: str, caller_device_id: str = "") -> bytes | None:
"""Read E2EE encryption key from io.element.call.encryption_keys state events.
"""Read E2EE encryption key from room timeline messages.
Element Call uses state_key format: @user:domain:DEVICE_ID
Falls back to trying just @user:domain and scanning all room state.
Element Call sends encryption keys as timeline events (NOT state events).
We scan recent room messages for io.element.call.encryption_keys events.
"""
# Try state_key formats: @user:domain:device_id, then @user:domain
state_keys_to_try = []
if caller_device_id:
state_keys_to_try.append(f"{sender}:{caller_device_id}")
state_keys_to_try.append(sender)
for state_key in state_keys_to_try:
try:
resp = await self.client.room_get_state_event(
room_id, ENCRYPTION_KEYS_TYPE, state_key,
)
logger.info("E2EE key lookup state_key=%s → resp type=%s", state_key, type(resp).__name__)
key = self._extract_e2ee_key(resp, sender, state_key)
if key:
return key
except Exception as e:
logger.info("E2EE key lookup state_key=%s failed: %s", state_key, e)
# Fallback: scan all room state via HTTP API for any encryption_keys event
try:
import httpx
token = self.client.access_token
url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/state"
# Fetch recent messages from timeline (where Element Call puts encryption keys)
url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/messages"
params = {"dir": "b", "limit": "50"}
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(url, headers={"Authorization": f"Bearer {token}"})
resp = await http.get(url, headers={"Authorization": f"Bearer {token}"}, params=params)
resp.raise_for_status()
events = resp.json()
logger.info("Room state scan: %d events total", len(events))
data = resp.json()
events = data.get("chunk", [])
logger.info("Timeline scan: %d events", len(events))
for evt in events:
evt_type = evt.get("type", "")
if "call" in evt_type or "encryption" in evt_type:
logger.info(" state event: type=%s state_key=%s content_keys=%s",
evt_type, evt.get("state_key", ""), list(evt.get("content", {}).keys())[:5])
if evt_type == ENCRYPTION_KEYS_TYPE and evt.get("sender") != BOT_USER:
if evt_type == ENCRYPTION_KEYS_TYPE:
evt_sender = evt.get("sender", "")
if evt_sender == BOT_USER:
continue # skip our own keys
content = evt.get("content", {})
keys = content.get("keys", [])
for k in keys:
logger.info("Found encryption_keys timeline event from %s: %s",
evt_sender, list(content.keys()))
for k in content.get("keys", []):
key_b64 = k.get("key", "")
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key = base64.urlsafe_b64decode(key_b64)
logger.info("Got E2EE key from room state scan (%s, %d bytes)",
evt.get("state_key", "?"), len(key))
logger.info("Got E2EE key from timeline (%s, %d bytes)",
evt_sender, len(key))
return key
# Log event types for debugging
types = [e.get("type", "") for e in events]
logger.info("Timeline event types: %s", types)
except Exception as e:
logger.warning("Room state scan for encryption keys failed: %s", e)
logger.warning("Timeline scan for encryption keys failed: %s", e)
logger.warning("No E2EE encryption key found for %s in %s", sender, room_id)
return None
@staticmethod
def _extract_e2ee_key(resp, sender: str, state_key: str) -> bytes | None:
"""Extract E2EE key bytes from a state event response."""
if not hasattr(resp, "content") or not resp.content:
return None
keys = resp.content.get("keys", [])
for k in keys:
key_b64 = k.get("key", "")
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key = base64.urlsafe_b64decode(key_b64)
logger.info("Got E2EE key from %s (state_key=%s, %d bytes)", sender, state_key, len(key))
return key
logger.info("No E2EE encryption key found in timeline for %s in %s", sender, room_id)
return None
async def _publish_encryption_key(self, room_id: str, key: bytes):
"""Publish bot's E2EE encryption key as io.element.call.encryption_keys state event."""
"""Publish bot's E2EE encryption key as a timeline event (NOT state).
Element Call distributes encryption keys as timeline events via
io.element.call.encryption_keys, not as state events.
"""
key_b64 = base64.urlsafe_b64encode(key).decode().rstrip("=")
content = {
"call_id": "",
"device_id": BOT_DEVICE_ID,
"keys": [{"index": 0, "key": key_b64}],
}
state_key = f"{BOT_USER}:{BOT_DEVICE_ID}"
try:
await self.client.room_put_state(
room_id, ENCRYPTION_KEYS_TYPE, content, state_key=state_key,
await self.client.room_send(
room_id,
message_type=ENCRYPTION_KEYS_TYPE,
content=content,
)
logger.info("Published E2EE key in %s", room_id)
logger.info("Published E2EE key as timeline event in %s", room_id)
except Exception:
logger.exception("Failed to publish E2EE key in %s", room_id)

View File

@@ -103,60 +103,45 @@ class VoiceSession:
sender, device_id, index, len(key))
async def _fetch_encryption_key_http(self) -> bytes | None:
"""Fetch encryption key from room state via Matrix HTTP API."""
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
Element Call distributes encryption keys as timeline events, not state.
"""
import httpx
homeserver = str(self.nio_client.homeserver)
token = self.nio_client.access_token
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/state"
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
try:
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(url, headers={"Authorization": f"Bearer {token}"})
resp = await http.get(
url,
headers={"Authorization": f"Bearer {token}"},
params={"dir": "b", "limit": "50"},
)
resp.raise_for_status()
events = resp.json()
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
sender = evt.get("sender", "")
user_id = self.nio_client.user_id
if sender == user_id:
continue # skip our own key
content = evt.get("content", {})
state_key = evt.get("state_key", "")
logger.info("Found encryption_keys event: sender=%s state_key=%s content=%s",
sender, state_key, content)
logger.info("Found encryption_keys timeline event: sender=%s content=%s",
sender, content)
for k in content.get("keys", []):
key_b64 = k.get("key", "")
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
import base64 as b64
return b64.urlsafe_b64decode(key_b64)
# Log all state event types for debugging
types = [e.get("type", "") for e in events]
logger.info("Room state event types: %s", types)
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
logger.warning("HTTP encryption key fetch failed: %s", e)
return None
async def _publish_e2ee_key(self, key: bytes):
"""Publish our E2EE key to room state so Element Call shares its key with us."""
import base64 as b64
key_b64 = b64.urlsafe_b64encode(key).decode().rstrip("=")
content = {
"call_id": "",
"device_id": self.device_id,
"keys": [{"index": 0, "key": key_b64}],
}
user_id = self.nio_client.user_id
state_key = f"{user_id}:{self.device_id}"
try:
ENCRYPTION_KEYS_TYPE = "io.element.call.encryption_keys"
await self.nio_client.room_put_state(
self.room_id, ENCRYPTION_KEYS_TYPE, content, state_key=state_key,
)
logger.info("Published E2EE key (state_key=%s)", state_key)
except Exception:
logger.exception("Failed to publish E2EE key")
async def start(self):
self._task = asyncio.create_task(self._run())
@@ -184,30 +169,31 @@ class VoiceSession:
user_id = self.nio_client.user_id
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Actively fetch encryption key from room state via HTTP API
# Element Call publishes keys as state events during active calls
# Check timeline for caller's encryption key
caller_key = await self._fetch_encryption_key_http()
if caller_key:
self._e2ee_key = caller_key
logger.info("Got caller E2EE key via HTTP (%d bytes)", len(caller_key))
else:
# Wait up to 10s for key via sync handler
logger.info("No key in room state yet, waiting for sync...")
for _ in range(100):
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
if not self._e2ee_key:
# Wait up to 15s for key via sync handler (bot.py forwards
# encryption_keys timeline events to on_encryption_key)
logger.info("No key in timeline yet, waiting for sync...")
for _ in range(150):
if self._e2ee_key:
break
await asyncio.sleep(0.1)
if self._e2ee_key:
shared_key = self._e2ee_key
logger.info("Using caller's E2EE key (%d bytes)", len(shared_key))
# Publish the SAME key so Element Call sees us as encrypted
await self._publish_e2ee_key(shared_key)
logger.info("Using E2EE key (%d bytes)", len(shared_key))
e2ee_opts = _build_e2ee_options(shared_key)
else:
logger.warning("No E2EE key available — connecting WITHOUT encryption")
e2ee_opts = None
# Generate our own key as fallback — bot.py already published one
import secrets
shared_key = secrets.token_bytes(32)
logger.warning("No caller E2EE key received — using generated key")
e2ee_opts = _build_e2ee_options(shared_key)
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
self.lk_room = rtc.Room()