fix(voice): add MSC4143 call.member encryption key support
Element Call v0.17+ embeds encryption_keys in call.member state events instead of separate timeline events. In E2EE rooms, timeline events are encrypted and the bot HTTP fetch cannot decrypt them, causing DEC_FAILED. - Extract caller keys from call.member state event on join - Embed bot key in call.member state event - Check call.member state in key fetch (before timeline fallback) - Handle key updates in call.member during active calls - Update voice.py key poller to check call.member state first - Add debug logging for UnknownEvent types in call rooms Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
137
bot.py
137
bot.py
@@ -1572,6 +1572,12 @@ class Bot:
|
|||||||
|
|
||||||
async def on_unknown(self, room, event: UnknownEvent):
|
async def on_unknown(self, room, event: UnknownEvent):
|
||||||
"""Handle call member state events and in-room verification."""
|
"""Handle call member state events and in-room verification."""
|
||||||
|
# Debug: log all UnknownEvent types in call rooms to diagnose E2EE key delivery
|
||||||
|
if room.room_id in self.active_calls or room.room_id in self.voice_sessions:
|
||||||
|
source = event.source or {}
|
||||||
|
logger.info("on_unknown in call room %s: type=%s sender=%s content_keys=%s",
|
||||||
|
room.room_id, event.type, event.sender, list(source.get("content", {}).keys()))
|
||||||
|
|
||||||
# Route verification events
|
# Route verification events
|
||||||
if event.type.startswith("m.key.verification."):
|
if event.type.startswith("m.key.verification."):
|
||||||
if event.sender != BOT_USER:
|
if event.sender != BOT_USER:
|
||||||
@@ -1612,6 +1618,26 @@ class Bot:
|
|||||||
# Non-empty content means someone started/is in a call
|
# Non-empty content means someone started/is in a call
|
||||||
if event.source.get("content", {}):
|
if event.source.get("content", {}):
|
||||||
room_id = room.room_id
|
room_id = room.room_id
|
||||||
|
content = event.source.get("content", {})
|
||||||
|
|
||||||
|
# MSC4143: encryption keys may be embedded in call.member state events
|
||||||
|
# Check for keys BEFORE the early return for active calls
|
||||||
|
enc_keys = content.get("encryption_keys", content.get("keys", []))
|
||||||
|
if enc_keys and room_id in self.voice_sessions:
|
||||||
|
device_id = content.get("device_id", "")
|
||||||
|
logger.info("Found encryption_keys in call.member event from %s (device=%s, keys=%d)",
|
||||||
|
event.sender, device_id, len(enc_keys))
|
||||||
|
vs = self.voice_sessions[room_id]
|
||||||
|
import base64 as b64
|
||||||
|
for k in enc_keys:
|
||||||
|
key_b64 = k.get("key", "")
|
||||||
|
key_index = k.get("index", 0)
|
||||||
|
if key_b64:
|
||||||
|
key_b64 += "=" * (-len(key_b64) % 4)
|
||||||
|
key_bytes = b64.urlsafe_b64decode(key_b64)
|
||||||
|
vs.on_encryption_key(event.sender, device_id, key_bytes, key_index)
|
||||||
|
logger.info("Delivered call.member embedded key (index=%d) to voice session", key_index)
|
||||||
|
|
||||||
if room_id in self.active_calls:
|
if room_id in self.active_calls:
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -1633,8 +1659,14 @@ class Bot:
|
|||||||
lk_room_name = base64.b64encode(lk_room_hash).decode().rstrip("=")
|
lk_room_name = base64.b64encode(lk_room_hash).decode().rstrip("=")
|
||||||
logger.info("LiveKit room name: %s (hashed from %s)", lk_room_name, room_id)
|
logger.info("LiveKit room name: %s (hashed from %s)", lk_room_name, room_id)
|
||||||
|
|
||||||
|
# Generate bot's own E2EE key early so we can embed it in call_member
|
||||||
|
import secrets
|
||||||
|
bot_key = secrets.token_bytes(16)
|
||||||
|
bot_key_b64 = base64.urlsafe_b64encode(bot_key).decode().rstrip("=")
|
||||||
|
|
||||||
# Send our own call member state event FIRST so Element Call
|
# Send our own call member state event FIRST so Element Call
|
||||||
# sends encryption_keys in response (before we start VoiceSession)
|
# sends encryption_keys in response (before we start VoiceSession)
|
||||||
|
# MSC4143: embed encryption_keys in call.member state event
|
||||||
call_content = {
|
call_content = {
|
||||||
"application": "m.call",
|
"application": "m.call",
|
||||||
"call_id": "",
|
"call_id": "",
|
||||||
@@ -1647,6 +1679,7 @@ class Bot:
|
|||||||
},
|
},
|
||||||
"foci_preferred": foci,
|
"foci_preferred": foci,
|
||||||
"m.call.intent": "audio",
|
"m.call.intent": "audio",
|
||||||
|
"encryption_keys": [{"index": 0, "key": bot_key_b64}],
|
||||||
}
|
}
|
||||||
|
|
||||||
state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call"
|
state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call"
|
||||||
@@ -1654,7 +1687,7 @@ class Bot:
|
|||||||
resp = await self.client.room_put_state(
|
resp = await self.client.room_put_state(
|
||||||
room_id, CALL_MEMBER_TYPE, call_content, state_key=state_key,
|
room_id, CALL_MEMBER_TYPE, call_content, state_key=state_key,
|
||||||
)
|
)
|
||||||
logger.info("Sent call member event in %s: %s", room_id, resp)
|
logger.info("Sent call member event in %s (with embedded E2EE key): %s", room_id, resp)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to send call member event in %s", room_id)
|
logger.exception("Failed to send call member event in %s", room_id)
|
||||||
|
|
||||||
@@ -1665,10 +1698,6 @@ class Bot:
|
|||||||
model = self.room_models.get(room_id, DEFAULT_MODEL)
|
model = self.room_models.get(room_id, DEFAULT_MODEL)
|
||||||
caller_device_id = content.get("device_id", "")
|
caller_device_id = content.get("device_id", "")
|
||||||
|
|
||||||
# Generate bot's own E2EE key (16 bytes like Element Call)
|
|
||||||
import secrets
|
|
||||||
bot_key = secrets.token_bytes(16)
|
|
||||||
|
|
||||||
# Collect all recent document contexts (< 1 hour)
|
# Collect all recent document contexts (< 1 hour)
|
||||||
doc_entries = [e for e in self._room_document_context.get(room_id, [])
|
doc_entries = [e for e in self._room_document_context.get(room_id, [])
|
||||||
if time.time() - e["timestamp"] < 3600]
|
if time.time() - e["timestamp"] < 3600]
|
||||||
@@ -1697,10 +1726,25 @@ class Bot:
|
|||||||
document_context=document_context,
|
document_context=document_context,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check timeline for caller's key
|
# Check for caller's key: first in call_member event (MSC4143),
|
||||||
caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id)
|
# then fall back to timeline scan (legacy io.element.call.encryption_keys)
|
||||||
if caller_key:
|
caller_key = None
|
||||||
vs.on_encryption_key(event.sender, caller_device_id, caller_key, 0)
|
caller_enc_keys = content.get("encryption_keys", content.get("keys", []))
|
||||||
|
if caller_enc_keys:
|
||||||
|
import base64 as b64
|
||||||
|
for k in caller_enc_keys:
|
||||||
|
key_b64 = k.get("key", "")
|
||||||
|
key_index = k.get("index", 0)
|
||||||
|
if key_b64:
|
||||||
|
key_b64 += "=" * (-len(key_b64) % 4)
|
||||||
|
caller_key = b64.urlsafe_b64decode(key_b64)
|
||||||
|
vs.on_encryption_key(event.sender, caller_device_id, caller_key, key_index)
|
||||||
|
logger.info("Got caller E2EE key from call.member event (index=%d)", key_index)
|
||||||
|
|
||||||
|
if not caller_key:
|
||||||
|
caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id)
|
||||||
|
if caller_key:
|
||||||
|
vs.on_encryption_key(event.sender, caller_device_id, caller_key, 0)
|
||||||
|
|
||||||
# Store BEFORE start so on_unknown handler can forward keys via sync
|
# Store BEFORE start so on_unknown handler can forward keys via sync
|
||||||
self.voice_sessions[room_id] = vs
|
self.voice_sessions[room_id] = vs
|
||||||
@@ -3541,19 +3585,46 @@ class Bot:
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def _get_call_encryption_key(self, room_id: str, sender: str, caller_device_id: str = "") -> bytes | None:
|
async def _get_call_encryption_key(self, room_id: str, sender: str, caller_device_id: str = "") -> bytes | None:
|
||||||
"""Read E2EE encryption key from room timeline messages.
|
"""Read E2EE encryption key from call.member state (MSC4143) or timeline (legacy).
|
||||||
|
|
||||||
Element Call sends encryption keys as timeline events (NOT state events).
|
MSC4143: encryption_keys embedded in org.matrix.msc3401.call.member state event
|
||||||
We scan recent room messages for io.element.call.encryption_keys events.
|
Legacy: io.element.call.encryption_keys timeline events
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
import httpx
|
import httpx
|
||||||
token = self.client.access_token
|
token = self.client.access_token
|
||||||
# Fetch recent messages from timeline (where Element Call puts encryption keys)
|
|
||||||
url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/messages"
|
# MSC4143: Check call.member state events for embedded encryption_keys
|
||||||
params = {"dir": "b", "limit": "50"}
|
state_url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/state"
|
||||||
async with httpx.AsyncClient(timeout=10.0) as http:
|
async with httpx.AsyncClient(timeout=10.0) as http:
|
||||||
resp = await http.get(url, headers={"Authorization": f"Bearer {token}"}, params=params)
|
resp = await http.get(state_url, headers={"Authorization": f"Bearer {token}"})
|
||||||
|
if resp.status_code == 200:
|
||||||
|
state_events = resp.json()
|
||||||
|
for evt in state_events:
|
||||||
|
if evt.get("type") != CALL_MEMBER_TYPE:
|
||||||
|
continue
|
||||||
|
evt_sender = evt.get("sender", "")
|
||||||
|
if evt_sender == BOT_USER:
|
||||||
|
continue
|
||||||
|
content = evt.get("content", {})
|
||||||
|
enc_keys = content.get("encryption_keys", [])
|
||||||
|
if enc_keys:
|
||||||
|
device = content.get("device_id", "")
|
||||||
|
logger.info("Found encryption_keys in call.member state from %s (device=%s, keys=%d)",
|
||||||
|
evt_sender, device, len(enc_keys))
|
||||||
|
for k in enc_keys:
|
||||||
|
key_b64 = k.get("key", "")
|
||||||
|
if key_b64:
|
||||||
|
key_b64 += "=" * (-len(key_b64) % 4)
|
||||||
|
key = base64.urlsafe_b64decode(key_b64)
|
||||||
|
logger.info("Got E2EE key from call.member state (%s, %d bytes)", evt_sender, len(key))
|
||||||
|
return key
|
||||||
|
|
||||||
|
# Legacy: scan timeline for io.element.call.encryption_keys events
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as http:
|
||||||
|
msg_url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/messages"
|
||||||
|
params = {"dir": "b", "limit": "50"}
|
||||||
|
resp = await http.get(msg_url, headers={"Authorization": f"Bearer {token}"}, params=params)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
events = data.get("chunk", [])
|
events = data.get("chunk", [])
|
||||||
@@ -3579,18 +3650,21 @@ class Bot:
|
|||||||
types = [e.get("type", "") for e in events]
|
types = [e.get("type", "") for e in events]
|
||||||
logger.info("Timeline event types: %s", types)
|
logger.info("Timeline event types: %s", types)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Timeline scan for encryption keys failed: %s", e)
|
logger.warning("Encryption key fetch failed: %s", e)
|
||||||
|
|
||||||
logger.info("No E2EE encryption key found in timeline for %s in %s", sender, room_id)
|
logger.info("No E2EE encryption key found for %s in %s", sender, room_id)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def _publish_encryption_key(self, room_id: str, key: bytes):
|
async def _publish_encryption_key(self, room_id: str, key: bytes):
|
||||||
"""Publish bot's E2EE encryption key as a timeline event (NOT state).
|
"""Publish bot's E2EE encryption key via both legacy timeline event
|
||||||
|
and MSC4143 call.member state event update.
|
||||||
|
|
||||||
Element Call distributes encryption keys as timeline events via
|
Legacy: io.element.call.encryption_keys timeline event
|
||||||
io.element.call.encryption_keys, not as state events.
|
MSC4143: encryption_keys field in org.matrix.msc3401.call.member state
|
||||||
"""
|
"""
|
||||||
key_b64 = base64.urlsafe_b64encode(key).decode().rstrip("=")
|
key_b64 = base64.urlsafe_b64encode(key).decode().rstrip("=")
|
||||||
|
|
||||||
|
# Legacy: timeline event (for older Element Call versions)
|
||||||
content = {
|
content = {
|
||||||
"call_id": "",
|
"call_id": "",
|
||||||
"device_id": BOT_DEVICE_ID,
|
"device_id": BOT_DEVICE_ID,
|
||||||
@@ -3604,7 +3678,26 @@ class Bot:
|
|||||||
)
|
)
|
||||||
logger.info("Published E2EE key as timeline event in %s", room_id)
|
logger.info("Published E2EE key as timeline event in %s", room_id)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to publish E2EE key in %s", room_id)
|
logger.exception("Failed to publish E2EE key in %s (legacy timeline)", room_id)
|
||||||
|
|
||||||
|
# MSC4143: update call.member state event with encryption_keys
|
||||||
|
state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call"
|
||||||
|
try:
|
||||||
|
import httpx
|
||||||
|
token = self.client.access_token
|
||||||
|
url = f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/state/{CALL_MEMBER_TYPE}/{state_key}"
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as http:
|
||||||
|
# GET current state
|
||||||
|
resp = await http.get(url, headers={"Authorization": f"Bearer {token}"})
|
||||||
|
if resp.status_code == 200:
|
||||||
|
current = resp.json()
|
||||||
|
current["encryption_keys"] = [{"index": 0, "key": key_b64}]
|
||||||
|
# PUT updated state
|
||||||
|
put_resp = await http.put(url, headers={"Authorization": f"Bearer {token}"}, json=current)
|
||||||
|
put_resp.raise_for_status()
|
||||||
|
logger.info("Updated call.member state with E2EE key in %s", room_id)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to update call.member state with E2EE key in %s", room_id)
|
||||||
|
|
||||||
async def _route_verification(self, room, event: UnknownEvent):
|
async def _route_verification(self, room, event: UnknownEvent):
|
||||||
"""Route in-room verification events from UnknownEvent."""
|
"""Route in-room verification events from UnknownEvent."""
|
||||||
|
|||||||
41
voice.py
41
voice.py
@@ -543,13 +543,50 @@ class VoiceSession:
|
|||||||
break
|
break
|
||||||
|
|
||||||
async def _fetch_encryption_key_http(self) -> bytes | None:
|
async def _fetch_encryption_key_http(self) -> bytes | None:
|
||||||
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
|
"""Fetch encryption key from call.member state (MSC4143) or timeline (legacy).
|
||||||
|
|
||||||
Element Call distributes encryption keys as timeline events, not state.
|
MSC4143: encryption_keys in org.matrix.msc3401.call.member state
|
||||||
|
Legacy: io.element.call.encryption_keys timeline events
|
||||||
"""
|
"""
|
||||||
import httpx
|
import httpx
|
||||||
homeserver = str(self.nio_client.homeserver)
|
homeserver = str(self.nio_client.homeserver)
|
||||||
token = self.nio_client.access_token
|
token = self.nio_client.access_token
|
||||||
|
user_id = self.nio_client.user_id
|
||||||
|
|
||||||
|
# MSC4143: check call.member state events first
|
||||||
|
try:
|
||||||
|
state_url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/state"
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as http:
|
||||||
|
resp = await http.get(state_url, headers={"Authorization": f"Bearer {token}"})
|
||||||
|
if resp.status_code == 200:
|
||||||
|
for evt in resp.json():
|
||||||
|
if evt.get("type") != "org.matrix.msc3401.call.member":
|
||||||
|
continue
|
||||||
|
sender = evt.get("sender", "")
|
||||||
|
if sender == user_id:
|
||||||
|
continue
|
||||||
|
content = evt.get("content", {})
|
||||||
|
enc_keys = content.get("encryption_keys", [])
|
||||||
|
if enc_keys:
|
||||||
|
device = content.get("device_id", "")
|
||||||
|
import base64 as b64
|
||||||
|
for k in enc_keys:
|
||||||
|
key_b64 = k.get("key", "")
|
||||||
|
key_index = k.get("index", 0)
|
||||||
|
if key_b64:
|
||||||
|
key_b64 += "=" * (-len(key_b64) % 4)
|
||||||
|
key_bytes = b64.urlsafe_b64decode(key_b64)
|
||||||
|
if device:
|
||||||
|
self._caller_identity = f"{sender}:{device}"
|
||||||
|
self.on_encryption_key(sender, device, key_bytes, key_index)
|
||||||
|
max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else key_index
|
||||||
|
latest = self._caller_all_keys.get(max_idx, key_bytes)
|
||||||
|
logger.info("Got key from call.member state (sender=%s, index=%d)", sender, key_index)
|
||||||
|
return latest
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("call.member state key fetch failed: %s", e)
|
||||||
|
|
||||||
|
# Legacy: timeline scan
|
||||||
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
|
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=10.0) as http:
|
async with httpx.AsyncClient(timeout=10.0) as http:
|
||||||
|
|||||||
Reference in New Issue
Block a user