feat(CF-1189): Add in-room SAS verification for E2E key sharing

Element withholds megolm keys from unverified devices. Implements
the full in-room m.key.verification.* protocol so Element Verify works.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-02-15 19:22:32 +02:00
parent ac26c71709
commit 21635bb3ab

149
bot.py
View File

@@ -3,9 +3,11 @@ import json
import asyncio import asyncio
import logging import logging
import time import time
import uuid
import httpx import httpx
from openai import AsyncOpenAI from openai import AsyncOpenAI
from olm import sas as olm_sas
from nio import ( from nio import (
AsyncClient, AsyncClient,
@@ -14,6 +16,7 @@ from nio import (
InviteMemberEvent, InviteMemberEvent,
MegolmEvent, MegolmEvent,
RoomMessageText, RoomMessageText,
RoomMessageUnknown,
SyncResponse, SyncResponse,
UnknownEvent, UnknownEvent,
KeyVerificationStart, KeyVerificationStart,
@@ -113,6 +116,7 @@ class Bot:
self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None
self.room_models: dict[str, str] = {} # room_id -> model name self.room_models: dict[str, str] = {} # room_id -> model name
self._sync_token_received = False self._sync_token_received = False
self._verifications: dict[str, dict] = {} # txn_id -> verification state
async def start(self): async def start(self):
# Restore existing session or create new one # Restore existing session or create new one
@@ -148,6 +152,7 @@ class Bot:
self.client.add_event_callback(self.on_megolm, MegolmEvent) self.client.add_event_callback(self.on_megolm, MegolmEvent)
self.client.add_event_callback(self.on_unknown, UnknownEvent) self.client.add_event_callback(self.on_unknown, UnknownEvent)
self.client.add_event_callback(self.on_text_message, RoomMessageText) self.client.add_event_callback(self.on_text_message, RoomMessageText)
self.client.add_event_callback(self.on_room_unknown, RoomMessageUnknown)
self.client.add_response_callback(self.on_sync, SyncResponse) self.client.add_response_callback(self.on_sync, SyncResponse)
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart)
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey)
@@ -406,6 +411,150 @@ class Bot:
}, },
) )
async def on_room_unknown(self, room, event: RoomMessageUnknown):
"""Handle in-room verification events."""
source = event.source or {}
content = source.get("content", {})
event_type = source.get("type", "")
if not event_type.startswith("m.key.verification."):
return
if event.sender == BOT_USER:
return
logger.info("Verification event: %s from %s", event_type, event.sender)
if event_type == "m.key.verification.request":
await self._handle_verification_request(room, source)
elif event_type == "m.key.verification.start":
await self._handle_verification_start(room, source)
elif event_type == "m.key.verification.key":
await self._handle_verification_key(room, source)
elif event_type == "m.key.verification.mac":
await self._handle_verification_mac(room, source)
elif event_type == "m.key.verification.cancel":
txn = content.get("m.relates_to", {}).get("event_id", "")
self._verifications.pop(txn, None)
logger.info("Verification cancelled: %s", txn)
async def _handle_verification_request(self, room, source):
content = source["content"]
txn_id = source["event_id"]
sender = source["sender"]
self._verifications[txn_id] = {"sender": sender, "room_id": room.room_id}
logger.info("Verification request from %s, txn=%s", sender, txn_id)
# Send m.key.verification.ready
await self.client.room_send(
room.room_id,
message_type="m.key.verification.ready",
content={
"m.relates_to": {"rel_type": "m.reference", "event_id": txn_id},
"from_device": self.client.device_id,
"methods": ["m.sas.v1"],
},
)
logger.info("Sent verification ready for %s", txn_id)
async def _handle_verification_start(self, room, source):
content = source["content"]
txn_id = content.get("m.relates_to", {}).get("event_id", "")
v = self._verifications.get(txn_id)
if not v:
logger.warning("Unknown verification start: %s", txn_id)
return
sas_obj = olm_sas.Sas()
v["sas"] = sas_obj
v["commitment"] = content.get("commitment", "")
# Send m.key.verification.accept is NOT needed when we sent "ready"
# and the other side sent "start". We go straight to sending our key.
# Send m.key.verification.key
await self.client.room_send(
room.room_id,
message_type="m.key.verification.key",
content={
"m.relates_to": {"rel_type": "m.reference", "event_id": txn_id},
"key": sas_obj.pubkey,
},
)
v["key_sent"] = True
logger.info("Sent SAS key for %s", txn_id)
async def _handle_verification_key(self, room, source):
content = source["content"]
txn_id = content.get("m.relates_to", {}).get("event_id", "")
v = self._verifications.get(txn_id)
if not v or "sas" not in v:
logger.warning("Unknown verification key: %s", txn_id)
return
sas_obj = v["sas"]
their_key = content["key"]
sas_obj.set_their_pubkey(their_key)
v["their_key"] = their_key
# Auto-confirm SAS (bot trusts the user)
# Generate MAC for our device key and master key
our_user = BOT_USER
our_device = self.client.device_id
their_user = v["sender"]
# Key IDs to MAC
key_id = f"ed25519:{our_device}"
device_key = self.client.olm.account.identity_keys["ed25519"]
# MAC info strings per spec
base_info = (
f"MATRIX_KEY_VERIFICATION_MAC"
f"{our_user}{our_device}"
f"{their_user}{content.get('from_device', '')}"
f"{txn_id}"
)
mac_dict = {}
keys_list = []
# MAC our ed25519 device key
mac_dict[key_id] = sas_obj.calculate_mac(device_key, base_info + key_id)
keys_list.append(key_id)
# MAC the key list
keys_str = ",".join(sorted(keys_list))
keys_mac = sas_obj.calculate_mac(keys_str, base_info + "KEY_IDS")
await self.client.room_send(
room.room_id,
message_type="m.key.verification.mac",
content={
"m.relates_to": {"rel_type": "m.reference", "event_id": txn_id},
"mac": mac_dict,
"keys": keys_mac,
},
)
logger.info("Sent SAS MAC for %s", txn_id)
async def _handle_verification_mac(self, room, source):
content = source["content"]
txn_id = content.get("m.relates_to", {}).get("event_id", "")
v = self._verifications.get(txn_id)
if not v:
return
# Verification complete — send done
await self.client.room_send(
room.room_id,
message_type="m.key.verification.done",
content={
"m.relates_to": {"rel_type": "m.reference", "event_id": txn_id},
},
)
logger.info("Verification complete for %s with %s", txn_id, v["sender"])
self._verifications.pop(txn_id, None)
async def on_megolm(self, room, event: MegolmEvent): async def on_megolm(self, room, event: MegolmEvent):
"""Request keys for undecryptable messages.""" """Request keys for undecryptable messages."""
logger.warning( logger.warning(