From 7fd3aae1760fcc05f1bab6c31fd1bd9289a02693 Mon Sep 17 00:00:00 2001 From: Christian Gick Date: Mon, 23 Mar 2026 19:05:48 +0200 Subject: [PATCH] feat(CF-2502): proper E2E encryption with cross-signing and device lifecycle Replace insecure auto-trust-all-devices with cross-signed-only trust policy. Extract cross-signing manager into reusable module with vault backup/recovery. Add device cleanup script and automatic old device pruning on startup. - device_trust.py: CrossSignedOnlyPolicy (only trust cross-signed devices) - cross_signing.py: Extracted from bot.py, adds vault seed backup + recovery - scripts/matrix_device_cleanup.py: Synapse Admin API bulk device cleanup CLI - bot.py: Use new modules, add _cleanup_own_devices() on startup Co-Authored-By: Claude Opus 4.6 (1M context) --- bot.py | 170 ++++++------------ cross_signing.py | 292 +++++++++++++++++++++++++++++++ device_trust.py | 34 ++++ scripts/matrix_device_cleanup.py | 227 ++++++++++++++++++++++++ 4 files changed, 609 insertions(+), 114 deletions(-) create mode 100644 cross_signing.py create mode 100644 device_trust.py create mode 100755 scripts/matrix_device_cleanup.py diff --git a/bot.py b/bot.py index fc0efe8..823d75a 100644 --- a/bot.py +++ b/bot.py @@ -45,6 +45,8 @@ from livekit import api, rtc from voice import VoiceSession from article_summary import ArticleSummaryHandler from cron import CronScheduler +from device_trust import CrossSignedOnlyPolicy +from cross_signing import CrossSigningManager BOT_DEVICE_ID = "AIBOT" CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member" @@ -1203,6 +1205,11 @@ class Bot: self._loaded_rooms: set[str] = set() # rooms where we've loaded state self._sync_token_received = False self._verifications: dict[str, dict] = {} # txn_id -> verification state + self.trust_policy = CrossSignedOnlyPolicy() + self.cross_signing = CrossSigningManager( + HOMESERVER, STORE_PATH, BOT_PASS, + vault_key=f"matrix.{BOT_USER.split(':')[0].lstrip('@')}.cross_signing_seeds", + ) self._room_document_context: dict[str, list[dict]] = {} # room_id -> [{type, filename, text, timestamp}, ...] # Article summary handler (Blinkist-style audio summaries) if self.llm and ELEVENLABS_API_KEY: @@ -1298,7 +1305,14 @@ class Bot: await self.client.keys_upload() # Bootstrap cross-signing if not already done - await self._ensure_cross_signing() + with open(CREDS_FILE) as f: + creds = json.load(f) + await self.cross_signing.ensure_cross_signing( + creds["user_id"], creds["device_id"], creds["access_token"], + ) + + # Clean up own old devices (keep max 3, older than 30 days) + await self._cleanup_own_devices(creds["device_id"], creds["access_token"]) self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET) self.client.add_event_callback(self.on_invite, InviteMemberEvent) @@ -1335,132 +1349,57 @@ class Bot: await self.client.sync_forever(timeout=30000, full_state=True) - async def _ensure_cross_signing(self): - """Ensure bot device is cross-signed so Element clients don't show authenticity warnings.""" - xsign_file = os.path.join(STORE_PATH, "cross_signing_keys.json") - - with open(CREDS_FILE) as f: - creds = json.load(f) - - user_id = creds["user_id"] - device_id = creds["device_id"] - token = creds["access_token"] - headers = {"Authorization": f"Bearer {token}"} - - # Check if device already has a cross-signing signature on the server + async def _cleanup_own_devices(self, current_device_id: str, access_token: str, keep: int = 3, max_age_days: int = 30): + """Remove own old devices via client API (no admin required).""" + headers = {"Authorization": f"Bearer {access_token}"} try: async with httpx.AsyncClient(timeout=15.0) as hc: - resp = await hc.post( - f"{HOMESERVER}/_matrix/client/v3/keys/query", - json={"device_keys": {user_id: [device_id]}}, - headers=headers, - ) - if resp.status_code == 200: - device_keys = resp.json().get("device_keys", {}).get(user_id, {}).get(device_id, {}) - sigs = device_keys.get("signatures", {}).get(user_id, {}) - device_key_id = f"ed25519:{device_id}" - has_cross_sig = any(k != device_key_id for k in sigs) - if has_cross_sig: - logger.info("Device %s already cross-signed, skipping bootstrap", device_id) - return - except Exception as e: - logger.warning("Cross-signing check failed: %s", e) + resp = await hc.get(f"{HOMESERVER}/_matrix/client/v3/devices", headers=headers) + if resp.status_code != 200: + logger.warning("Device list failed: %d", resp.status_code) + return - # Load existing seeds or generate new ones - if os.path.exists(xsign_file): - with open(xsign_file) as f: - seeds = json.load(f) - master_seed = base64.b64decode(seeds["master_seed"]) - ss_seed = base64.b64decode(seeds["self_signing_seed"]) - us_seed = base64.b64decode(seeds["user_signing_seed"]) - logger.info("Loaded existing cross-signing seeds, re-signing device") - else: - master_seed = os.urandom(32) - ss_seed = os.urandom(32) - us_seed = os.urandom(32) - logger.info("Generating new cross-signing keys") + devices = resp.json().get("devices", []) + if len(devices) <= keep: + return - master_key = olm.pk.PkSigning(master_seed) - self_signing_key = olm.pk.PkSigning(ss_seed) - user_signing_key = olm.pk.PkSigning(us_seed) + # Sort by last_seen_ts descending + devices.sort(key=lambda d: d.get("last_seen_ts") or 0, reverse=True) - def make_key(usage, pubkey): - return { - "user_id": user_id, - "usage": [usage], - "keys": {"ed25519:" + pubkey: pubkey}, - } + now_ms = time.time() * 1000 + to_delete = [] + for i, dev in enumerate(devices): + if dev["device_id"] == current_device_id: + continue + if i < keep: + continue + last_seen = dev.get("last_seen_ts") or 0 + if last_seen > 0 and (now_ms - last_seen) < max_age_days * 86400 * 1000: + continue + to_delete.append(dev["device_id"]) - master_obj = make_key("master", master_key.public_key) - ss_obj = make_key("self_signing", self_signing_key.public_key) - us_obj = make_key("user_signing", user_signing_key.public_key) + if not to_delete: + return - # Sign sub-keys with master key - ss_canonical = canonicaljson.encode_canonical_json(ss_obj) - ss_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(ss_canonical)}} - - us_canonical = canonicaljson.encode_canonical_json(us_obj) - us_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(us_canonical)}} - - try: - async with httpx.AsyncClient(timeout=15.0) as hc: - # Upload cross-signing keys with password auth - resp = await hc.post( - f"{HOMESERVER}/_matrix/client/v3/keys/device_signing/upload", + # Bulk delete own devices (requires password re-auth) + del_resp = await hc.post( + f"{HOMESERVER}/_matrix/client/v3/delete_devices", json={ - "master_key": master_obj, - "self_signing_key": ss_obj, - "user_signing_key": us_obj, + "devices": to_delete, "auth": { "type": "m.login.password", - "identifier": {"type": "m.id.user", "user": user_id}, + "identifier": {"type": "m.id.user", "user": BOT_USER}, "password": BOT_PASS, }, }, headers=headers, ) - if resp.status_code != 200: - logger.error("Cross-signing upload failed: %d %s", resp.status_code, resp.text) - return - logger.info("Cross-signing keys uploaded") - - # Fetch device keys to sign - qresp = await hc.post( - f"{HOMESERVER}/_matrix/client/v3/keys/query", - json={"device_keys": {user_id: [device_id]}}, - headers=headers, - ) - device_obj = qresp.json()["device_keys"][user_id][device_id] - device_obj.pop("signatures", None) - device_obj.pop("unsigned", None) - - # Sign device with self-signing key - dk_canonical = canonicaljson.encode_canonical_json(device_obj) - dk_sig = self_signing_key.sign(dk_canonical) - device_obj["signatures"] = {user_id: {"ed25519:" + self_signing_key.public_key: dk_sig}} - - resp2 = await hc.post( - f"{HOMESERVER}/_matrix/client/v3/keys/signatures/upload", - json={user_id: {device_id: device_obj}}, - headers=headers, - ) - if resp2.status_code != 200: - logger.error("Device signature upload failed: %d %s", resp2.status_code, resp2.text) - return - logger.info("Device %s cross-signed successfully", device_id) + if del_resp.status_code == 200: + logger.info("Cleaned up %d old devices (kept %d)", len(to_delete), keep) + else: + logger.warning("Device cleanup failed: %d %s", del_resp.status_code, del_resp.text) except Exception as e: - logger.error("Cross-signing bootstrap failed: %s", e, exc_info=True) - return - - # Save seeds if new - if not os.path.exists(xsign_file): - with open(xsign_file, "w") as f: - json.dump({ - "master_seed": base64.b64encode(master_seed).decode(), - "self_signing_seed": base64.b64encode(ss_seed).decode(), - "user_signing_seed": base64.b64encode(us_seed).decode(), - }, f) - logger.info("Cross-signing seeds saved to %s", xsign_file) + logger.warning("Device cleanup error (non-fatal): %s", e) async def _inject_rag_key(self): """Load document encryption key from Matrix and inject into RAG service.""" @@ -1502,7 +1441,7 @@ class Bot: await self.client.join(room.room_id) async def on_sync(self, response: SyncResponse): - """After each sync, trust all devices in our rooms.""" + """After each sync, trust cross-signed devices only.""" if not self._sync_token_received: self._sync_token_received = True logger.info("Initial sync complete, text handler active") @@ -1511,8 +1450,11 @@ class Bot: for user_id in list(self.client.device_store.users): for device in self.client.device_store.active_user_devices(user_id): if not device.verified: - self.client.verify_device(device) - logger.info("Auto-trusted device %s of %s", device.device_id, user_id) + if self.trust_policy.should_trust(user_id, device): + self.client.verify_device(device) + logger.info("Cross-sign-verified device %s of %s", device.device_id, user_id) + else: + logger.debug("Skipped unverified device %s of %s (no cross-signing sig)", device.device_id, user_id) async def on_reaction(self, room, event: ReactionEvent): """Handle reaction events for pipeline approval flow.""" diff --git a/cross_signing.py b/cross_signing.py new file mode 100644 index 0000000..cc384d7 --- /dev/null +++ b/cross_signing.py @@ -0,0 +1,292 @@ +"""Cross-signing manager for Matrix bot accounts. + +Handles bootstrapping, verification, and recovery of cross-signing keys. +Reusable by any bot (matrix-ai-agent, claude-matrix-bridge, etc.). +""" +import base64 +import json +import logging +import os +import subprocess + +import canonicaljson +import httpx +import olm.pk + +logger = logging.getLogger(__name__) + + +class CrossSigningManager: + """Manages cross-signing keys for a Matrix bot device.""" + + def __init__(self, homeserver: str, store_path: str, bot_password: str, vault_key: str = ""): + """ + Args: + vault_key: Vault key for seed backup/recovery (e.g. "matrix.ai.cross_signing_seeds"). + If empty, vault backup is disabled. + """ + self.homeserver = homeserver + self.store_path = store_path + self.bot_password = bot_password + self.vault_key = vault_key + self._xsign_file = os.path.join(store_path, "cross_signing_keys.json") + + async def ensure_cross_signing(self, user_id: str, device_id: str, access_token: str) -> bool: + """Ensure device is cross-signed. Returns True if already signed or newly signed.""" + headers = {"Authorization": f"Bearer {access_token}"} + + # Check if device already has a cross-signing signature + if await self._is_device_cross_signed(user_id, device_id, headers): + logger.info("Device %s already cross-signed, skipping bootstrap", device_id) + return True + + # Load existing seeds or generate new ones + master_seed, ss_seed, us_seed = self._load_or_generate_seeds() + + master_key = olm.pk.PkSigning(master_seed) + self_signing_key = olm.pk.PkSigning(ss_seed) + user_signing_key = olm.pk.PkSigning(us_seed) + + # Upload cross-signing keys and sign device + success = await self._upload_and_sign( + user_id, device_id, access_token, + master_key, self_signing_key, user_signing_key, + ) + + if success: + self._save_seeds(master_seed, ss_seed, us_seed) + # Verify the signature was applied + if await self._is_device_cross_signed(user_id, device_id, headers): + logger.info("Device %s cross-signed and verified successfully", device_id) + return True + else: + logger.error("Device %s: signature upload succeeded but verification failed — retrying once", device_id) + # Retry signing the device (keys already uploaded) + retry = await self._sign_device(user_id, device_id, access_token, self_signing_key) + if retry and await self._is_device_cross_signed(user_id, device_id, headers): + logger.info("Device %s cross-signed on retry", device_id) + return True + logger.error("Device %s cross-signing failed after retry", device_id) + + return False + + async def verify_cross_signing_status(self, user_id: str, device_id: str, access_token: str) -> dict: + """Check cross-signing status. Returns dict with status details.""" + headers = {"Authorization": f"Bearer {access_token}"} + is_signed = await self._is_device_cross_signed(user_id, device_id, headers) + has_seeds = os.path.exists(self._xsign_file) + return { + "device_id": device_id, + "cross_signed": is_signed, + "seeds_stored": has_seeds, + } + + def get_public_keys(self) -> dict | None: + """Return public key fingerprints (safe to log). Returns None if no seeds.""" + if not os.path.exists(self._xsign_file): + return None + master_seed, ss_seed, us_seed = self._load_or_generate_seeds() + return { + "master": olm.pk.PkSigning(master_seed).public_key, + "self_signing": olm.pk.PkSigning(ss_seed).public_key, + "user_signing": olm.pk.PkSigning(us_seed).public_key, + } + + # -- Private methods -- + + async def _is_device_cross_signed(self, user_id: str, device_id: str, headers: dict) -> bool: + """Query server to check if device has a cross-signing signature.""" + try: + async with httpx.AsyncClient(timeout=15.0) as hc: + resp = await hc.post( + f"{self.homeserver}/_matrix/client/v3/keys/query", + json={"device_keys": {user_id: [device_id]}}, + headers=headers, + ) + if resp.status_code != 200: + return False + device_keys = resp.json().get("device_keys", {}).get(user_id, {}).get(device_id, {}) + sigs = device_keys.get("signatures", {}).get(user_id, {}) + device_key_id = f"ed25519:{device_id}" + return any(k != device_key_id for k in sigs) + except Exception as e: + logger.warning("Cross-signing check failed: %s", e) + return False + + def _load_or_generate_seeds(self) -> tuple[bytes, bytes, bytes]: + """Load seeds from file, vault backup, or generate new ones.""" + # 1. Try local file first + if os.path.exists(self._xsign_file): + with open(self._xsign_file) as f: + seeds = json.load(f) + logger.info("Loaded cross-signing seeds from local store") + return ( + base64.b64decode(seeds["master_seed"]), + base64.b64decode(seeds["self_signing_seed"]), + base64.b64decode(seeds["user_signing_seed"]), + ) + + # 2. Try vault recovery + if self.vault_key: + recovered = self._vault_recover_seeds() + if recovered: + logger.info("Recovered cross-signing seeds from vault") + return recovered + + # 3. Generate new + logger.info("Generating new cross-signing keys") + return os.urandom(32), os.urandom(32), os.urandom(32) + + def _save_seeds(self, master: bytes, ss: bytes, us: bytes) -> None: + """Persist seeds to local file and vault.""" + if os.path.exists(self._xsign_file): + # File exists; still try vault backup if not yet stored + self._vault_backup_seeds(master, ss, us) + return + os.makedirs(os.path.dirname(self._xsign_file), exist_ok=True) + with open(self._xsign_file, "w") as f: + json.dump({ + "master_seed": base64.b64encode(master).decode(), + "self_signing_seed": base64.b64encode(ss).decode(), + "user_signing_seed": base64.b64encode(us).decode(), + }, f) + logger.info("Cross-signing seeds saved to %s", self._xsign_file) + self._vault_backup_seeds(master, ss, us) + + def _vault_backup_seeds(self, master: bytes, ss: bytes, us: bytes) -> None: + """Backup seeds to vault for disaster recovery.""" + if not self.vault_key: + return + payload = json.dumps({ + "master_seed": base64.b64encode(master).decode(), + "self_signing_seed": base64.b64encode(ss).decode(), + "user_signing_seed": base64.b64encode(us).decode(), + }) + try: + result = subprocess.run( + ["vault", "set", self.vault_key], + input=payload, capture_output=True, text=True, timeout=10, + ) + if result.returncode == 0: + logger.info("Cross-signing seeds backed up to vault key %s", self.vault_key) + else: + logger.warning("Vault backup failed: %s", result.stderr.strip()) + except (FileNotFoundError, subprocess.TimeoutExpired) as e: + logger.warning("Vault backup skipped: %s", e) + + def _vault_recover_seeds(self) -> tuple[bytes, bytes, bytes] | None: + """Attempt to recover seeds from vault.""" + if not self.vault_key: + return None + try: + result = subprocess.run( + ["vault", "get", self.vault_key], + capture_output=True, text=True, timeout=10, + ) + if result.returncode != 0 or not result.stdout.strip(): + return None + seeds = json.loads(result.stdout.strip()) + return ( + base64.b64decode(seeds["master_seed"]), + base64.b64decode(seeds["self_signing_seed"]), + base64.b64decode(seeds["user_signing_seed"]), + ) + except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError, KeyError) as e: + logger.warning("Vault recovery failed: %s", e) + return None + + async def _upload_and_sign( + self, + user_id: str, + device_id: str, + access_token: str, + master_key: olm.pk.PkSigning, + self_signing_key: olm.pk.PkSigning, + user_signing_key: olm.pk.PkSigning, + ) -> bool: + """Upload cross-signing keys and sign device.""" + headers = {"Authorization": f"Bearer {access_token}"} + + def make_key(usage: str, pubkey: str) -> dict: + return { + "user_id": user_id, + "usage": [usage], + "keys": {"ed25519:" + pubkey: pubkey}, + } + + master_obj = make_key("master", master_key.public_key) + ss_obj = make_key("self_signing", self_signing_key.public_key) + us_obj = make_key("user_signing", user_signing_key.public_key) + + # Sign sub-keys with master key + ss_canonical = canonicaljson.encode_canonical_json(ss_obj) + ss_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(ss_canonical)}} + + us_canonical = canonicaljson.encode_canonical_json(us_obj) + us_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(us_canonical)}} + + try: + async with httpx.AsyncClient(timeout=15.0) as hc: + # Upload cross-signing keys with password auth + resp = await hc.post( + f"{self.homeserver}/_matrix/client/v3/keys/device_signing/upload", + json={ + "master_key": master_obj, + "self_signing_key": ss_obj, + "user_signing_key": us_obj, + "auth": { + "type": "m.login.password", + "identifier": {"type": "m.id.user", "user": user_id}, + "password": self.bot_password, + }, + }, + headers=headers, + ) + if resp.status_code != 200: + logger.error("Cross-signing upload failed: %d %s", resp.status_code, resp.text) + return False + logger.info("Cross-signing keys uploaded (master: %s)", master_key.public_key) + + # Sign the device + return await self._sign_device(user_id, device_id, access_token, self_signing_key) + + except Exception as e: + logger.error("Cross-signing bootstrap failed: %s", e, exc_info=True) + return False + + async def _sign_device( + self, user_id: str, device_id: str, access_token: str, self_signing_key: olm.pk.PkSigning, + ) -> bool: + """Sign a device with the self-signing key.""" + headers = {"Authorization": f"Bearer {access_token}"} + + try: + async with httpx.AsyncClient(timeout=15.0) as hc: + qresp = await hc.post( + f"{self.homeserver}/_matrix/client/v3/keys/query", + json={"device_keys": {user_id: [device_id]}}, + headers=headers, + ) + device_obj = qresp.json()["device_keys"][user_id][device_id] + device_obj.pop("signatures", None) + device_obj.pop("unsigned", None) + + dk_canonical = canonicaljson.encode_canonical_json(device_obj) + dk_sig = self_signing_key.sign(dk_canonical) + device_obj["signatures"] = { + user_id: {"ed25519:" + self_signing_key.public_key: dk_sig}, + } + + resp = await hc.post( + f"{self.homeserver}/_matrix/client/v3/keys/signatures/upload", + json={user_id: {device_id: device_obj}}, + headers=headers, + ) + if resp.status_code != 200: + logger.error("Device signature upload failed: %d %s", resp.status_code, resp.text) + return False + logger.info("Device %s signed with self-signing key", device_id) + return True + except Exception as e: + logger.error("Device signing failed: %s", e, exc_info=True) + return False diff --git a/device_trust.py b/device_trust.py new file mode 100644 index 0000000..8d4cc23 --- /dev/null +++ b/device_trust.py @@ -0,0 +1,34 @@ +"""Device trust policy: only trust cross-signed devices. + +Replaces the insecure auto-trust-all pattern with selective verification +based on cross-signing signatures. +""" +import logging + +logger = logging.getLogger(__name__) + + +class CrossSignedOnlyPolicy: + """Trust only devices that carry a cross-signing signature. + + A device's signatures dict typically contains its own ed25519:DEVICE_ID + self-signature. A cross-signed device additionally has a signature from + the user's self-signing key (ed25519:SELF_SIGNING_PUB). This policy + checks for that extra signature. + """ + + def should_trust(self, user_id: str, device) -> bool: + """Return True if device has a cross-signing signature beyond its own.""" + sigs = getattr(device, "signatures", None) + if not sigs: + return False + + user_sigs = sigs.get(user_id, {}) + device_self_key = f"ed25519:{device.device_id}" + + # Trust if any signature key is NOT the device's own key + for key_id in user_sigs: + if key_id != device_self_key: + return True + + return False diff --git a/scripts/matrix_device_cleanup.py b/scripts/matrix_device_cleanup.py new file mode 100755 index 0000000..a21d828 --- /dev/null +++ b/scripts/matrix_device_cleanup.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +"""Clean up stale Matrix devices via Synapse Admin API. + +Usage: + python matrix_device_cleanup.py --user @admin:agiliton.eu --keep 1 --dry-run + python matrix_device_cleanup.py --user @admin:agiliton.eu --keep 1 + python matrix_device_cleanup.py --auto --max-age-days 30 --keep 3 +""" +import argparse +import asyncio +import json +import logging +import os +import subprocess +import sys +import time + +import httpx + +logger = logging.getLogger(__name__) + +BATCH_SIZE = 100 +BATCH_DELAY = 1.0 # seconds between batch deletions + + +async def get_admin_token(homeserver: str) -> str: + """Get Synapse admin token from env or vault.""" + token = os.environ.get("SYNAPSE_ADMIN_TOKEN") + if token: + return token + + try: + result = subprocess.run( + ["vault", "get", "matrix.agiliton.admin_token"], + capture_output=True, text=True, timeout=10, + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + raise RuntimeError( + "No admin token found. Set SYNAPSE_ADMIN_TOKEN or store in vault " + "as matrix.agiliton.admin_token" + ) + + +async def list_devices( + client: httpx.AsyncClient, homeserver: str, headers: dict, user_id: str, +) -> list[dict]: + """List all devices for a user via Synapse Admin API.""" + resp = await client.get( + f"{homeserver}/_synapse/admin/v2/users/{user_id}/devices", + headers=headers, + ) + resp.raise_for_status() + return resp.json().get("devices", []) + + +async def delete_devices_batch( + client: httpx.AsyncClient, + homeserver: str, + headers: dict, + user_id: str, + device_ids: list[str], +) -> int: + """Bulk-delete devices. Returns count deleted.""" + resp = await client.post( + f"{homeserver}/_synapse/admin/v2/users/{user_id}/delete_devices", + headers=headers, + json={"devices": device_ids}, + ) + resp.raise_for_status() + return len(device_ids) + + +async def cleanup_devices( + homeserver: str, + user_id: str, + keep: int = 1, + max_age_days: int | None = None, + dry_run: bool = False, + skip_device_ids: list[str] | None = None, +) -> dict: + """Remove stale devices, keeping the N most recently active. + + Returns summary dict with counts. + """ + token = await get_admin_token(homeserver) + headers = {"Authorization": f"Bearer {token}"} + skip = set(skip_device_ids or []) + + async with httpx.AsyncClient(timeout=30.0) as client: + devices = await list_devices(client, homeserver, headers, user_id) + + if not devices: + logger.info("No devices found for %s", user_id) + return {"total": 0, "kept": 0, "deleted": 0} + + # Sort by last_seen_ts descending (most recent first), treat None as 0 + devices.sort(key=lambda d: d.get("last_seen_ts") or 0, reverse=True) + + # Determine which to keep + to_keep = [] + to_delete = [] + + for i, dev in enumerate(devices): + dev_id = dev["device_id"] + last_seen = dev.get("last_seen_ts") or 0 + + # Always skip explicitly protected devices + if dev_id in skip: + to_keep.append(dev) + continue + + # Keep the top N most recent + if i < keep: + to_keep.append(dev) + continue + + # If max_age_days set, only delete devices older than threshold + if max_age_days is not None and last_seen > 0: + age_days = (time.time() * 1000 - last_seen) / (86400 * 1000) + if age_days < max_age_days: + to_keep.append(dev) + continue + + to_delete.append(dev) + + logger.info( + "User %s: %d total devices, keeping %d, deleting %d%s", + user_id, len(devices), len(to_keep), len(to_delete), + " (DRY RUN)" if dry_run else "", + ) + + if dry_run: + for dev in to_delete[:10]: + last = dev.get("last_seen_ts") or 0 + age = f"{(time.time() * 1000 - last) / (86400 * 1000):.1f}d" if last else "never" + logger.info( + " Would delete: %s (display: %s, last seen: %s ago)", + dev["device_id"], + dev.get("display_name", ""), + age, + ) + if len(to_delete) > 10: + logger.info(" ... and %d more", len(to_delete) - 10) + return { + "total": len(devices), + "kept": len(to_keep), + "deleted": 0, + "would_delete": len(to_delete), + } + + # Delete in batches + deleted = 0 + delete_ids = [d["device_id"] for d in to_delete] + + for i in range(0, len(delete_ids), BATCH_SIZE): + batch = delete_ids[i : i + BATCH_SIZE] + try: + count = await delete_devices_batch( + client, homeserver, headers, user_id, batch, + ) + deleted += count + logger.info( + " Deleted batch %d-%d (%d devices)", + i, i + len(batch), count, + ) + except httpx.HTTPStatusError as e: + logger.error( + " Batch %d-%d failed: %d %s", + i, i + len(batch), e.response.status_code, e.response.text, + ) + + if i + BATCH_SIZE < len(delete_ids): + await asyncio.sleep(BATCH_DELAY) + + logger.info("Cleanup complete: deleted %d of %d devices", deleted, len(devices)) + return {"total": len(devices), "kept": len(to_keep), "deleted": deleted} + + +def main(): + parser = argparse.ArgumentParser(description="Clean up stale Matrix devices") + parser.add_argument("--user", required=True, help="Matrix user ID (e.g. @admin:agiliton.eu)") + parser.add_argument( + "--homeserver", + default=os.environ.get("MATRIX_HOMESERVER", "https://matrix.agiliton.eu"), + help="Homeserver URL", + ) + parser.add_argument("--keep", type=int, default=1, help="Number of most recent devices to keep") + parser.add_argument("--max-age-days", type=int, default=None, help="Only delete devices older than N days") + parser.add_argument("--dry-run", action="store_true", help="Show what would be deleted without deleting") + parser.add_argument("--skip", nargs="*", default=[], help="Device IDs to never delete") + parser.add_argument("--auto", action="store_true", help="Auto mode: --max-age-days 30 --keep 3") + parser.add_argument("-v", "--verbose", action="store_true") + + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(levelname)s %(message)s", + ) + + if args.auto: + if args.max_age_days is None: + args.max_age_days = 30 + if args.keep == 1: + args.keep = 3 + + result = asyncio.run( + cleanup_devices( + homeserver=args.homeserver, + user_id=args.user, + keep=args.keep, + max_age_days=args.max_age_days, + dry_run=args.dry_run, + skip_device_ids=args.skip, + ) + ) + + print(json.dumps(result, indent=2)) + sys.exit(0 if result.get("deleted", 0) >= 0 else 1) + + +if __name__ == "__main__": + main()