import os import asyncio import logging from nio import AsyncClient, LoginResponse, InviteMemberEvent from livekit import api logger = logging.getLogger("matrix-ai-bot") HOMESERVER = os.environ["MATRIX_HOMESERVER"] BOT_USER = os.environ["MATRIX_BOT_USER"] BOT_PASS = os.environ["MATRIX_BOT_PASSWORD"] LK_URL = os.environ["LIVEKIT_URL"] LK_KEY = os.environ["LIVEKIT_API_KEY"] LK_SECRET = os.environ["LIVEKIT_API_SECRET"] AGENT_NAME = os.environ.get("AGENT_NAME", "matrix-ai") class Bot: def __init__(self): self.client = AsyncClient(HOMESERVER, BOT_USER) self.lkapi = None self.dispatched_rooms = set() async def start(self): resp = await self.client.login(BOT_PASS, device_name="ai-voice-bot") if not isinstance(resp, LoginResponse): logger.error("Login failed: %s", resp) return logger.info("Logged in as %s", resp.user_id) self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET) self.client.add_event_callback(self.on_invite, InviteMemberEvent) await self.client.sync_forever(timeout=30000) async def on_invite(self, room, event: InviteMemberEvent): if event.state_key != BOT_USER: return logger.info("Invited to %s", room.room_id) await self.client.join(room.room_id) # LiveKit room name = Matrix room ID (Element Call convention) lk_room_name = room.room_id try: await self.lkapi.agent_dispatch.create_dispatch( api.CreateAgentDispatchRequest( agent_name=AGENT_NAME, room=lk_room_name, ) ) self.dispatched_rooms.add(room.room_id) logger.info("Agent dispatched to %s", lk_room_name) except Exception: logger.exception("Dispatch failed for %s", room.room_id) async def cleanup(self): await self.client.close() if self.lkapi: await self.lkapi.aclose() async def main(): bot = Bot() try: await bot.start() finally: await bot.cleanup() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main())