import os import json import asyncio import logging import time import httpx from openai import AsyncOpenAI from nio import ( AsyncClient, AsyncClientConfig, LoginResponse, InviteMemberEvent, MegolmEvent, RoomMessageText, SyncResponse, UnknownEvent, KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac, ToDeviceError, ) from livekit import api BOT_DEVICE_ID = "AIBOT" CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member" MODEL_STATE_TYPE = "ai.agiliton.model" logger = logging.getLogger("matrix-ai-bot") HOMESERVER = os.environ["MATRIX_HOMESERVER"] BOT_USER = os.environ["MATRIX_BOT_USER"] BOT_PASS = os.environ["MATRIX_BOT_PASSWORD"] LK_URL = os.environ["LIVEKIT_URL"] LK_KEY = os.environ["LIVEKIT_API_KEY"] LK_SECRET = os.environ["LIVEKIT_API_SECRET"] AGENT_NAME = os.environ.get("AGENT_NAME", "matrix-ai") STORE_PATH = os.environ.get("CRYPTO_STORE_PATH", "/data/crypto_store") CREDS_FILE = os.path.join(STORE_PATH, "credentials.json") LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "claude-sonnet") WILDFILES_BASE_URL = os.environ.get("WILDFILES_BASE_URL", "") WILDFILES_ORG = os.environ.get("WILDFILES_ORG", "") SYSTEM_PROMPT = """You are a helpful AI assistant in a Matrix chat room. Keep answers concise but thorough. Use markdown formatting when helpful. If document context is provided, use it to inform your answers.""" HELP_TEXT = """**AI Bot Commands** - `!ai help` — Show this help - `!ai models` — List available models - `!ai set-model ` — Set model for this room - `!ai search ` — Search documents (WildFiles) - **@mention the bot** or start with `!ai` for a regular AI response""" class DocumentRAG: """Search WildFiles for relevant documents.""" def __init__(self, base_url: str, org: str): self.base_url = base_url.rstrip("/") self.org = org self.enabled = bool(base_url and org) async def search(self, query: str, top_k: int = 3) -> list[dict]: if not self.enabled: return [] try: async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.post( f"{self.base_url}/api/v1/rag/search", json={"query": query, "org": self.org, "top_k": top_k}, ) resp.raise_for_status() return resp.json().get("results", []) except Exception: logger.debug("WildFiles search failed", exc_info=True) return [] def format_context(self, results: list[dict]) -> str: if not results: return "" parts = ["**Relevant documents:**"] for r in results: title = r.get("title", r.get("filename", "Untitled")) snippet = r.get("content", r.get("text", ""))[:500] parts.append(f"- **{title}**: {snippet}") return "\n".join(parts) class Bot: def __init__(self): config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) self.client = AsyncClient( HOMESERVER, BOT_USER, store_path=STORE_PATH, config=config, ) self.lkapi = None self.dispatched_rooms = set() self.active_calls = set() # rooms where we've sent call member event self.rag = DocumentRAG(WILDFILES_BASE_URL, WILDFILES_ORG) self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None self.room_models: dict[str, str] = {} # room_id -> model name self._sync_token_received = False async def start(self): # Restore existing session or create new one if os.path.exists(CREDS_FILE): with open(CREDS_FILE) as f: creds = json.load(f) self.client.restore_login( user_id=creds["user_id"], device_id=creds["device_id"], access_token=creds["access_token"], ) self.client.load_store() logger.info("Restored session as %s (device %s)", creds["user_id"], creds["device_id"]) else: resp = await self.client.login(BOT_PASS, device_name="ai-voice-bot") if not isinstance(resp, LoginResponse): logger.error("Login failed: %s", resp) return # Persist credentials for next restart with open(CREDS_FILE, "w") as f: json.dump({ "user_id": resp.user_id, "device_id": resp.device_id, "access_token": resp.access_token, }, f) logger.info("Logged in as %s (device %s) — credentials saved", resp.user_id, resp.device_id) if self.client.should_upload_keys: await self.client.keys_upload() self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET) self.client.add_event_callback(self.on_invite, InviteMemberEvent) self.client.add_event_callback(self.on_megolm, MegolmEvent) self.client.add_event_callback(self.on_unknown, UnknownEvent) self.client.add_event_callback(self.on_text_message, RoomMessageText) self.client.add_response_callback(self.on_sync, SyncResponse) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationMac) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationCancel) await self.client.sync_forever(timeout=30000, full_state=True) async def on_invite(self, room, event: InviteMemberEvent): if event.state_key != BOT_USER: return logger.info("Invited to %s, joining room", room.room_id) await self.client.join(room.room_id) async def on_sync(self, response: SyncResponse): """After each sync, trust all devices in our rooms.""" if not self._sync_token_received: self._sync_token_received = True logger.info("Initial sync complete, text handler active") for user_id in list(self.client.device_store.users): for device in self.client.device_store.active_user_devices(user_id): if not device.verified: self.client.verify_device(device) logger.info("Auto-trusted device %s of %s", device.device_id, user_id) async def on_unknown(self, room, event: UnknownEvent): """Handle call member state events to join calls.""" if event.type != CALL_MEMBER_TYPE: return if event.sender == BOT_USER: return # ignore our own events # Non-empty content means someone started/is in a call if event.source.get("content", {}): room_id = room.room_id if room_id in self.active_calls: return logger.info("Call detected in %s from %s, joining...", room_id, event.sender) self.active_calls.add(room_id) # Get the foci_preferred from the caller's event content = event.source["content"] foci = content.get("foci_preferred", [{ "type": "livekit", "livekit_service_url": f"{HOMESERVER}/livekit-jwt-service", "livekit_alias": room_id, }]) # Extract LiveKit room name from foci and dispatch agent lk_room_name = room_id # fallback for f in foci: if f.get("type") == "livekit" and f.get("livekit_alias"): lk_room_name = f["livekit_alias"] break logger.info("LiveKit room name: %s (from foci_preferred)", lk_room_name) if room_id not in self.dispatched_rooms: try: await self.lkapi.agent_dispatch.create_dispatch( api.CreateAgentDispatchRequest( agent_name=AGENT_NAME, room=lk_room_name, ) ) self.dispatched_rooms.add(room_id) logger.info("Agent dispatched to LiveKit room %s", lk_room_name) except Exception: logger.exception("Dispatch failed for %s", lk_room_name) # Send our own call member state event call_content = { "application": "m.call", "call_id": "", "scope": "m.room", "device_id": BOT_DEVICE_ID, "expires": 7200000, "focus_active": { "type": "livekit", "focus_selection": "oldest_membership", }, "foci_preferred": foci, "m.call.intent": "audio", } state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call" try: resp = await self.client.room_put_state( room_id, CALL_MEMBER_TYPE, call_content, state_key=state_key, ) logger.info("Sent call member event in %s: %s", room_id, resp) except Exception: logger.exception("Failed to send call member event in %s", room_id) else: # Empty content = someone left the call, check if anyone is still calling room_id = room.room_id if room_id in self.active_calls: # Leave the call too self.active_calls.discard(room_id) state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call" try: await self.client.room_put_state( room_id, CALL_MEMBER_TYPE, {}, state_key=state_key, ) logger.info("Left call in %s", room_id) except Exception: logger.exception("Failed to leave call in %s", room_id) async def on_text_message(self, room, event: RoomMessageText): """Handle text messages: commands and AI responses.""" if event.sender == BOT_USER: return if not self._sync_token_received: return # ignore messages from initial sync / backfill # Ignore old messages (>30s) to avoid replaying history server_ts = event.server_timestamp / 1000 if time.time() - server_ts > 30: return body = event.body.strip() # Command handling if body.startswith("!ai "): cmd = body[4:].strip() await self._handle_command(room, cmd) return if body == "!ai": await self._send_text(room.room_id, HELP_TEXT) return # Check if bot is mentioned (display name or user ID) bot_display = self.client.user_id.split(":")[0].lstrip("@") mentioned = ( BOT_USER in body or f"@{bot_display}" in body.lower() or bot_display.lower() in body.lower() ) if not mentioned: return if not self.llm: await self._send_text(room.room_id, "LLM not configured (LITELLM_BASE_URL not set).") return await self.client.room_typing(room.room_id, typing_state=True) try: await self._respond_with_ai(room, body) finally: await self.client.room_typing(room.room_id, typing_state=False) async def _handle_command(self, room, cmd: str): if cmd == "help": await self._send_text(room.room_id, HELP_TEXT) elif cmd == "models": if not self.llm: await self._send_text(room.room_id, "LLM not configured.") return try: models = await self.llm.models.list() names = sorted(m.id for m in models.data) current = self.room_models.get(room.room_id, DEFAULT_MODEL) text = "**Available models:**\n" text += "\n".join(f"- `{n}` {'← current' if n == current else ''}" for n in names) await self._send_text(room.room_id, text) except Exception: logger.exception("Failed to list models") await self._send_text(room.room_id, "Failed to fetch model list.") elif cmd.startswith("set-model "): model = cmd[10:].strip() if not model: await self._send_text(room.room_id, "Usage: `!ai set-model `") return self.room_models[room.room_id] = model # Persist in room state for cross-restart persistence try: await self.client.room_put_state( room.room_id, MODEL_STATE_TYPE, {"model": model}, state_key="", ) except Exception: logger.debug("Could not persist model to room state", exc_info=True) await self._send_text(room.room_id, f"Model set to `{model}` for this room.") elif cmd.startswith("search "): query = cmd[7:].strip() if not query: await self._send_text(room.room_id, "Usage: `!ai search `") return results = await self.rag.search(query, top_k=5) if not results: await self._send_text(room.room_id, "No documents found.") return await self._send_text(room.room_id, self.rag.format_context(results)) else: # Treat unknown commands as AI prompts if self.llm: await self.client.room_typing(room.room_id, typing_state=True) try: await self._respond_with_ai(room, cmd) finally: await self.client.room_typing(room.room_id, typing_state=False) else: await self._send_text(room.room_id, f"Unknown command: `{cmd}`\n\n{HELP_TEXT}") async def _respond_with_ai(self, room, user_message: str): model = self.room_models.get(room.room_id, DEFAULT_MODEL) # Build conversation context from room timeline messages = [{"role": "system", "content": SYSTEM_PROMPT}] # WildFiles document context doc_results = await self.rag.search(user_message) doc_context = self.rag.format_context(doc_results) if doc_context: messages.append({"role": "system", "content": doc_context}) # Last N messages from room timeline as context timeline = room.timeline history = list(timeline)[-10:] if timeline else [] for evt in history: if not hasattr(evt, "body"): continue role = "assistant" if evt.sender == BOT_USER else "user" messages.append({"role": role, "content": evt.body}) # Ensure last message is the current user message if not messages or messages[-1].get("content") != user_message: messages.append({"role": "user", "content": user_message}) try: resp = await self.llm.chat.completions.create( model=model, messages=messages, max_tokens=2048, ) reply = resp.choices[0].message.content await self._send_text(room.room_id, reply) except Exception: logger.exception("LLM call failed") await self._send_text(room.room_id, "Sorry, I couldn't generate a response.") async def _send_text(self, room_id: str, text: str): await self.client.room_send( room_id, message_type="m.room.message", content={ "msgtype": "m.text", "body": text, "format": "org.matrix.custom.html", "formatted_body": text, }, ) async def on_megolm(self, room, event: MegolmEvent): """Log undecryptable messages.""" logger.warning( "Undecryptable event %s in %s from %s", event.event_id, room.room_id, event.sender, ) async def on_key_verification(self, event): """Auto-accept key verification requests.""" if isinstance(event, KeyVerificationStart): sas = self.client.key_verifications.get(event.transaction_id) if sas: await self.client.accept_key_verification(event.transaction_id) await self.client.to_device(sas.share_key()) elif isinstance(event, KeyVerificationKey): sas = self.client.key_verifications.get(event.transaction_id) if sas: await self.client.confirm_short_auth_string(event.transaction_id) elif isinstance(event, KeyVerificationMac): sas = self.client.key_verifications.get(event.transaction_id) if sas: mac = sas.get_mac() if not isinstance(mac, ToDeviceError): await self.client.to_device(mac) async def cleanup(self): await self.client.close() if self.lkapi: await self.lkapi.aclose() async def main(): os.makedirs(STORE_PATH, exist_ok=True) bot = Bot() try: await bot.start() finally: await bot.cleanup() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main())