feat: Bootstrap cross-signing keys at startup to fix Element authenticity warnings

Integrates _ensure_cross_signing() into Bot.start() flow. On first run, generates
and uploads cross-signing keys, then signs the bot device. On subsequent restarts,
detects existing cross-signatures and skips. Seeds persisted for device recovery.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-03-09 08:13:33 +02:00
parent 06b876bdea
commit 19abea01ca

132
bot.py
View File

@@ -15,6 +15,8 @@ import fitz # pymupdf
import httpx import httpx
from openai import AsyncOpenAI from openai import AsyncOpenAI
from olm import sas as olm_sas from olm import sas as olm_sas
import olm.pk
import canonicaljson
from rag_key_manager import RAGKeyManager from rag_key_manager import RAGKeyManager
from nio import ( from nio import (
@@ -1093,6 +1095,9 @@ class Bot:
if self.client.should_upload_keys: if self.client.should_upload_keys:
await self.client.keys_upload() await self.client.keys_upload()
# Bootstrap cross-signing if not already done
await self._ensure_cross_signing()
self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET) self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET)
self.client.add_event_callback(self.on_invite, InviteMemberEvent) self.client.add_event_callback(self.on_invite, InviteMemberEvent)
self.client.add_event_callback(self.on_megolm, MegolmEvent) self.client.add_event_callback(self.on_megolm, MegolmEvent)
@@ -1111,6 +1116,133 @@ class Bot:
await self.client.sync_forever(timeout=30000, full_state=True) await self.client.sync_forever(timeout=30000, full_state=True)
async def _ensure_cross_signing(self):
"""Ensure bot device is cross-signed so Element clients don't show authenticity warnings."""
xsign_file = os.path.join(STORE_PATH, "cross_signing_keys.json")
with open(CREDS_FILE) as f:
creds = json.load(f)
user_id = creds["user_id"]
device_id = creds["device_id"]
token = creds["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# Check if device already has a cross-signing signature on the server
try:
async with httpx.AsyncClient(timeout=15.0) as hc:
resp = await hc.post(
f"{HOMESERVER}/_matrix/client/v3/keys/query",
json={"device_keys": {user_id: [device_id]}},
headers=headers,
)
if resp.status_code == 200:
device_keys = resp.json().get("device_keys", {}).get(user_id, {}).get(device_id, {})
sigs = device_keys.get("signatures", {}).get(user_id, {})
device_key_id = f"ed25519:{device_id}"
has_cross_sig = any(k != device_key_id for k in sigs)
if has_cross_sig:
logger.info("Device %s already cross-signed, skipping bootstrap", device_id)
return
except Exception as e:
logger.warning("Cross-signing check failed: %s", e)
# Load existing seeds or generate new ones
if os.path.exists(xsign_file):
with open(xsign_file) as f:
seeds = json.load(f)
master_seed = base64.b64decode(seeds["master_seed"])
ss_seed = base64.b64decode(seeds["self_signing_seed"])
us_seed = base64.b64decode(seeds["user_signing_seed"])
logger.info("Loaded existing cross-signing seeds, re-signing device")
else:
master_seed = os.urandom(32)
ss_seed = os.urandom(32)
us_seed = os.urandom(32)
logger.info("Generating new cross-signing keys")
master_key = olm.pk.PkSigning(master_seed)
self_signing_key = olm.pk.PkSigning(ss_seed)
user_signing_key = olm.pk.PkSigning(us_seed)
def make_key(usage, pubkey):
return {
"user_id": user_id,
"usage": [usage],
"keys": {"ed25519:" + pubkey: pubkey},
}
master_obj = make_key("master", master_key.public_key)
ss_obj = make_key("self_signing", self_signing_key.public_key)
us_obj = make_key("user_signing", user_signing_key.public_key)
# Sign sub-keys with master key
ss_canonical = canonicaljson.encode_canonical_json(ss_obj)
ss_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(ss_canonical)}}
us_canonical = canonicaljson.encode_canonical_json(us_obj)
us_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(us_canonical)}}
try:
async with httpx.AsyncClient(timeout=15.0) as hc:
# Upload cross-signing keys with password auth
resp = await hc.post(
f"{HOMESERVER}/_matrix/client/v3/keys/device_signing/upload",
json={
"master_key": master_obj,
"self_signing_key": ss_obj,
"user_signing_key": us_obj,
"auth": {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": user_id},
"password": BOT_PASS,
},
},
headers=headers,
)
if resp.status_code != 200:
logger.error("Cross-signing upload failed: %d %s", resp.status_code, resp.text)
return
logger.info("Cross-signing keys uploaded")
# Fetch device keys to sign
qresp = await hc.post(
f"{HOMESERVER}/_matrix/client/v3/keys/query",
json={"device_keys": {user_id: [device_id]}},
headers=headers,
)
device_obj = qresp.json()["device_keys"][user_id][device_id]
device_obj.pop("signatures", None)
device_obj.pop("unsigned", None)
# Sign device with self-signing key
dk_canonical = canonicaljson.encode_canonical_json(device_obj)
dk_sig = self_signing_key.sign(dk_canonical)
device_obj["signatures"] = {user_id: {"ed25519:" + self_signing_key.public_key: dk_sig}}
resp2 = await hc.post(
f"{HOMESERVER}/_matrix/client/v3/keys/signatures/upload",
json={user_id: {device_id: device_obj}},
headers=headers,
)
if resp2.status_code != 200:
logger.error("Device signature upload failed: %d %s", resp2.status_code, resp2.text)
return
logger.info("Device %s cross-signed successfully", device_id)
except Exception as e:
logger.error("Cross-signing bootstrap failed: %s", e, exc_info=True)
return
# Save seeds if new
if not os.path.exists(xsign_file):
with open(xsign_file, "w") as f:
json.dump({
"master_seed": base64.b64encode(master_seed).decode(),
"self_signing_seed": base64.b64encode(ss_seed).decode(),
"user_signing_seed": base64.b64encode(us_seed).decode(),
}, f)
logger.info("Cross-signing seeds saved to %s", xsign_file)
async def _inject_rag_key(self): async def _inject_rag_key(self):
"""Load document encryption key from Matrix and inject into RAG service.""" """Load document encryption key from Matrix and inject into RAG service."""
try: try: