import os import asyncio import logging from nio import ( AsyncClient, AsyncClientConfig, LoginResponse, InviteMemberEvent, MegolmEvent, SyncResponse, KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac, ToDeviceError, ) from livekit import api logger = logging.getLogger("matrix-ai-bot") HOMESERVER = os.environ["MATRIX_HOMESERVER"] BOT_USER = os.environ["MATRIX_BOT_USER"] BOT_PASS = os.environ["MATRIX_BOT_PASSWORD"] LK_URL = os.environ["LIVEKIT_URL"] LK_KEY = os.environ["LIVEKIT_API_KEY"] LK_SECRET = os.environ["LIVEKIT_API_SECRET"] AGENT_NAME = os.environ.get("AGENT_NAME", "matrix-ai") STORE_PATH = os.environ.get("CRYPTO_STORE_PATH", "/data/crypto_store") class Bot: def __init__(self): config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) self.client = AsyncClient( HOMESERVER, BOT_USER, store_path=STORE_PATH, config=config, ) self.lkapi = None self.dispatched_rooms = set() async def start(self): resp = await self.client.login(BOT_PASS, device_name="ai-voice-bot") if not isinstance(resp, LoginResponse): logger.error("Login failed: %s", resp) return logger.info("Logged in as %s (device %s)", resp.user_id, resp.device_id) # Trust our own device keys if self.client.should_upload_keys: await self.client.keys_upload() self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET) self.client.add_event_callback(self.on_invite, InviteMemberEvent) self.client.add_event_callback(self.on_megolm, MegolmEvent) self.client.add_response_callback(self.on_sync, SyncResponse) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationMac) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationCancel) await self.client.sync_forever(timeout=30000, full_state=True) async def on_invite(self, room, event: InviteMemberEvent): if event.state_key != BOT_USER: return logger.info("Invited to %s", room.room_id) await self.client.join(room.room_id) # LiveKit room name = Matrix room ID (Element Call convention) lk_room_name = room.room_id try: await self.lkapi.agent_dispatch.create_dispatch( api.CreateAgentDispatchRequest( agent_name=AGENT_NAME, room=lk_room_name, ) ) self.dispatched_rooms.add(room.room_id) logger.info("Agent dispatched to %s", lk_room_name) except Exception: logger.exception("Dispatch failed for %s", room.room_id) async def on_sync(self, response: SyncResponse): """After each sync, trust all devices in our rooms.""" for user_id in self.client.device_store.users: for device in self.client.device_store.active_user_devices(user_id): if not device.verified: self.client.verify_device(device) logger.info("Auto-trusted device %s of %s", device.device_id, user_id) async def on_megolm(self, room, event: MegolmEvent): """Handle undecryptable messages by requesting keys.""" logger.warning( "Can't decrypt event %s in %s from %s (session %s)", event.event_id, room.room_id, event.sender, event.session_id, ) async def on_key_verification(self, event): """Auto-accept key verification requests.""" if isinstance(event, KeyVerificationStart): sas = self.client.key_verifications.get(event.transaction_id) if sas: await self.client.accept_key_verification(event.transaction_id) await self.client.to_device(sas.share_key()) elif isinstance(event, KeyVerificationKey): sas = self.client.key_verifications.get(event.transaction_id) if sas: await self.client.confirm_short_auth_string(event.transaction_id) elif isinstance(event, KeyVerificationMac): sas = self.client.key_verifications.get(event.transaction_id) if sas: mac = sas.get_mac() if not isinstance(mac, ToDeviceError): await self.client.to_device(mac) async def cleanup(self): await self.client.close() if self.lkapi: await self.lkapi.aclose() async def main(): os.makedirs(STORE_PATH, exist_ok=True) bot = Bot() try: await bot.start() finally: await bot.cleanup() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main())