"""One-time script to bootstrap cross-signing keys for the bot.""" import os import json import asyncio import olm.pk import canonicaljson import aiohttp from nio import AsyncClient, AsyncClientConfig STORE = os.environ.get("CRYPTO_STORE_PATH", "/data/crypto_store") CREDS_FILE = os.path.join(STORE, "credentials.json") HS = os.environ["MATRIX_HOMESERVER"] BOT_PASS = os.environ["MATRIX_BOT_PASSWORD"] async def bootstrap(): with open(CREDS_FILE) as f: creds = json.load(f) config = AsyncClientConfig(encryption_enabled=True, store_sync_tokens=True) client = AsyncClient(HS, creds["user_id"], store_path=STORE, config=config) client.restore_login(creds["user_id"], creds["device_id"], creds["access_token"]) client.load_store() await client.sync(timeout=10000, full_state=True) user_id = creds["user_id"] device_id = creds["device_id"] token = creds["access_token"] # Generate cross-signing key pairs (PkSigning needs a 32-byte random seed) import base64 master_seed = os.urandom(32) ss_seed = os.urandom(32) us_seed = os.urandom(32) master_key = olm.pk.PkSigning(master_seed) self_signing_key = olm.pk.PkSigning(ss_seed) user_signing_key = olm.pk.PkSigning(us_seed) master_pub = master_key.public_key ss_pub = self_signing_key.public_key us_pub = user_signing_key.public_key print(f"Master key: {master_pub}") print(f"Self-signing key: {ss_pub}") print(f"User-signing key: {us_pub}") def make_key(usage, pubkey): return { "user_id": user_id, "usage": [usage], "keys": {"ed25519:" + pubkey: pubkey}, } master_obj = make_key("master", master_pub) ss_obj = make_key("self_signing", ss_pub) us_obj = make_key("user_signing", us_pub) # Sign sub-keys with master key ss_canonical = canonicaljson.encode_canonical_json(ss_obj) ss_sig = master_key.sign(ss_canonical) ss_obj["signatures"] = {user_id: {"ed25519:" + master_pub: ss_sig}} us_canonical = canonicaljson.encode_canonical_json(us_obj) us_sig = master_key.sign(us_canonical) us_obj["signatures"] = {user_id: {"ed25519:" + master_pub: us_sig}} # Upload cross-signing keys with password auth upload_body = { "master_key": master_obj, "self_signing_key": ss_obj, "user_signing_key": us_obj, "auth": { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": user_id}, "password": BOT_PASS, }, } headers = {"Authorization": "Bearer " + token} async with aiohttp.ClientSession() as session: async with session.post( HS + "/_matrix/client/v3/keys/device_signing/upload", json=upload_body, headers=headers, ) as resp: body = await resp.text() print(f"Upload cross-signing keys: {resp.status} {body}") if resp.status != 200: await client.close() return # Sign our own device with self-signing key # Fetch the device keys from the server to sign the exact object async with session.post( HS + "/_matrix/client/v3/keys/query", json={"device_keys": {user_id: [device_id]}}, headers=headers, ) as qresp: qbody = await qresp.json() print(f"Keys query: {qresp.status}") device_obj = qbody["device_keys"][user_id][device_id] # Remove existing signatures before signing device_obj.pop("signatures", None) device_obj.pop("unsigned", None) dk_canonical = canonicaljson.encode_canonical_json(device_obj) dk_sig = self_signing_key.sign(dk_canonical) # Add the cross-signing signature to the device object device_obj["signatures"] = { user_id: { "ed25519:" + ss_pub: dk_sig, } } sig_upload = { user_id: { device_id: device_obj, } } async with session.post( HS + "/_matrix/client/v3/keys/signatures/upload", json=sig_upload, headers=headers, ) as resp2: body2 = await resp2.text() print(f"Upload device signature: {resp2.status} {body2}") # Save cross-signing seeds for persistence xsign_file = os.path.join(STORE, "cross_signing_keys.json") with open(xsign_file, "w") as f: json.dump({ "master_seed": base64.b64encode(master_seed).decode(), "self_signing_seed": base64.b64encode(ss_seed).decode(), "user_signing_seed": base64.b64encode(us_seed).decode(), }, f) print(f"Cross-signing seeds saved to {xsign_file}") await client.close() print("Done! Bot device is now cross-signed.") if __name__ == "__main__": asyncio.run(bootstrap())