feat(CF-1189): Add AI text bot + WildFiles RAG integration
Extends bot.py with text message handling: - RoomMessageText callback with @mention detection - LLM responses via LiteLLM (OpenAI-compatible) - WildFiles document search (DocumentRAG class) - Per-room model selection via room state events - Commands: !ai help/models/set-model/search - Typing indicators during AI response generation - 30s staleness check to avoid replaying history Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
212
bot.py
212
bot.py
@@ -2,6 +2,10 @@ import os
|
|||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from openai import AsyncOpenAI
|
||||||
|
|
||||||
from nio import (
|
from nio import (
|
||||||
AsyncClient,
|
AsyncClient,
|
||||||
@@ -9,6 +13,7 @@ from nio import (
|
|||||||
LoginResponse,
|
LoginResponse,
|
||||||
InviteMemberEvent,
|
InviteMemberEvent,
|
||||||
MegolmEvent,
|
MegolmEvent,
|
||||||
|
RoomMessageText,
|
||||||
SyncResponse,
|
SyncResponse,
|
||||||
UnknownEvent,
|
UnknownEvent,
|
||||||
KeyVerificationStart,
|
KeyVerificationStart,
|
||||||
@@ -21,6 +26,7 @@ from livekit import api
|
|||||||
|
|
||||||
BOT_DEVICE_ID = "AIBOT"
|
BOT_DEVICE_ID = "AIBOT"
|
||||||
CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member"
|
CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member"
|
||||||
|
MODEL_STATE_TYPE = "ai.agiliton.model"
|
||||||
|
|
||||||
logger = logging.getLogger("matrix-ai-bot")
|
logger = logging.getLogger("matrix-ai-bot")
|
||||||
|
|
||||||
@@ -34,6 +40,57 @@ AGENT_NAME = os.environ.get("AGENT_NAME", "matrix-ai")
|
|||||||
STORE_PATH = os.environ.get("CRYPTO_STORE_PATH", "/data/crypto_store")
|
STORE_PATH = os.environ.get("CRYPTO_STORE_PATH", "/data/crypto_store")
|
||||||
CREDS_FILE = os.path.join(STORE_PATH, "credentials.json")
|
CREDS_FILE = os.path.join(STORE_PATH, "credentials.json")
|
||||||
|
|
||||||
|
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
|
||||||
|
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
||||||
|
DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "claude-sonnet")
|
||||||
|
WILDFILES_BASE_URL = os.environ.get("WILDFILES_BASE_URL", "")
|
||||||
|
WILDFILES_ORG = os.environ.get("WILDFILES_ORG", "")
|
||||||
|
|
||||||
|
SYSTEM_PROMPT = """You are a helpful AI assistant in a Matrix chat room.
|
||||||
|
Keep answers concise but thorough. Use markdown formatting when helpful.
|
||||||
|
If document context is provided, use it to inform your answers."""
|
||||||
|
|
||||||
|
HELP_TEXT = """**AI Bot Commands**
|
||||||
|
- `!ai help` — Show this help
|
||||||
|
- `!ai models` — List available models
|
||||||
|
- `!ai set-model <model>` — Set model for this room
|
||||||
|
- `!ai search <query>` — Search documents (WildFiles)
|
||||||
|
- **@mention the bot** or start with `!ai` for a regular AI response"""
|
||||||
|
|
||||||
|
|
||||||
|
class DocumentRAG:
|
||||||
|
"""Search WildFiles for relevant documents."""
|
||||||
|
|
||||||
|
def __init__(self, base_url: str, org: str):
|
||||||
|
self.base_url = base_url.rstrip("/")
|
||||||
|
self.org = org
|
||||||
|
self.enabled = bool(base_url and org)
|
||||||
|
|
||||||
|
async def search(self, query: str, top_k: int = 3) -> list[dict]:
|
||||||
|
if not self.enabled:
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{self.base_url}/api/v1/rag/search",
|
||||||
|
json={"query": query, "org": self.org, "top_k": top_k},
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json().get("results", [])
|
||||||
|
except Exception:
|
||||||
|
logger.debug("WildFiles search failed", exc_info=True)
|
||||||
|
return []
|
||||||
|
|
||||||
|
def format_context(self, results: list[dict]) -> str:
|
||||||
|
if not results:
|
||||||
|
return ""
|
||||||
|
parts = ["**Relevant documents:**"]
|
||||||
|
for r in results:
|
||||||
|
title = r.get("title", r.get("filename", "Untitled"))
|
||||||
|
snippet = r.get("content", r.get("text", ""))[:500]
|
||||||
|
parts.append(f"- **{title}**: {snippet}")
|
||||||
|
return "\n".join(parts)
|
||||||
|
|
||||||
|
|
||||||
class Bot:
|
class Bot:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -52,6 +109,10 @@ class Bot:
|
|||||||
self.lkapi = None
|
self.lkapi = None
|
||||||
self.dispatched_rooms = set()
|
self.dispatched_rooms = set()
|
||||||
self.active_calls = set() # rooms where we've sent call member event
|
self.active_calls = set() # rooms where we've sent call member event
|
||||||
|
self.rag = DocumentRAG(WILDFILES_BASE_URL, WILDFILES_ORG)
|
||||||
|
self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None
|
||||||
|
self.room_models: dict[str, str] = {} # room_id -> model name
|
||||||
|
self._sync_token_received = False
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
# Restore existing session or create new one
|
# Restore existing session or create new one
|
||||||
@@ -86,6 +147,7 @@ class Bot:
|
|||||||
self.client.add_event_callback(self.on_invite, InviteMemberEvent)
|
self.client.add_event_callback(self.on_invite, InviteMemberEvent)
|
||||||
self.client.add_event_callback(self.on_megolm, MegolmEvent)
|
self.client.add_event_callback(self.on_megolm, MegolmEvent)
|
||||||
self.client.add_event_callback(self.on_unknown, UnknownEvent)
|
self.client.add_event_callback(self.on_unknown, UnknownEvent)
|
||||||
|
self.client.add_event_callback(self.on_text_message, RoomMessageText)
|
||||||
self.client.add_response_callback(self.on_sync, SyncResponse)
|
self.client.add_response_callback(self.on_sync, SyncResponse)
|
||||||
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart)
|
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart)
|
||||||
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey)
|
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey)
|
||||||
@@ -102,6 +164,9 @@ class Bot:
|
|||||||
|
|
||||||
async def on_sync(self, response: SyncResponse):
|
async def on_sync(self, response: SyncResponse):
|
||||||
"""After each sync, trust all devices in our rooms."""
|
"""After each sync, trust all devices in our rooms."""
|
||||||
|
if not self._sync_token_received:
|
||||||
|
self._sync_token_received = True
|
||||||
|
logger.info("Initial sync complete, text handler active")
|
||||||
for user_id in list(self.client.device_store.users):
|
for user_id in list(self.client.device_store.users):
|
||||||
for device in self.client.device_store.active_user_devices(user_id):
|
for device in self.client.device_store.active_user_devices(user_id):
|
||||||
if not device.verified:
|
if not device.verified:
|
||||||
@@ -192,6 +257,153 @@ class Bot:
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to leave call in %s", room_id)
|
logger.exception("Failed to leave call in %s", room_id)
|
||||||
|
|
||||||
|
async def on_text_message(self, room, event: RoomMessageText):
|
||||||
|
"""Handle text messages: commands and AI responses."""
|
||||||
|
if event.sender == BOT_USER:
|
||||||
|
return
|
||||||
|
if not self._sync_token_received:
|
||||||
|
return # ignore messages from initial sync / backfill
|
||||||
|
# Ignore old messages (>30s) to avoid replaying history
|
||||||
|
server_ts = event.server_timestamp / 1000
|
||||||
|
if time.time() - server_ts > 30:
|
||||||
|
return
|
||||||
|
|
||||||
|
body = event.body.strip()
|
||||||
|
|
||||||
|
# Command handling
|
||||||
|
if body.startswith("!ai "):
|
||||||
|
cmd = body[4:].strip()
|
||||||
|
await self._handle_command(room, cmd)
|
||||||
|
return
|
||||||
|
if body == "!ai":
|
||||||
|
await self._send_text(room.room_id, HELP_TEXT)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check if bot is mentioned (display name or user ID)
|
||||||
|
bot_display = self.client.user_id.split(":")[0].lstrip("@")
|
||||||
|
mentioned = (
|
||||||
|
BOT_USER in body
|
||||||
|
or f"@{bot_display}" in body.lower()
|
||||||
|
or bot_display.lower() in body.lower()
|
||||||
|
)
|
||||||
|
if not mentioned:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self.llm:
|
||||||
|
await self._send_text(room.room_id, "LLM not configured (LITELLM_BASE_URL not set).")
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.client.room_typing(room.room_id, typing_state=True)
|
||||||
|
try:
|
||||||
|
await self._respond_with_ai(room, body)
|
||||||
|
finally:
|
||||||
|
await self.client.room_typing(room.room_id, typing_state=False)
|
||||||
|
|
||||||
|
async def _handle_command(self, room, cmd: str):
|
||||||
|
if cmd == "help":
|
||||||
|
await self._send_text(room.room_id, HELP_TEXT)
|
||||||
|
|
||||||
|
elif cmd == "models":
|
||||||
|
if not self.llm:
|
||||||
|
await self._send_text(room.room_id, "LLM not configured.")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
models = await self.llm.models.list()
|
||||||
|
names = sorted(m.id for m in models.data)
|
||||||
|
current = self.room_models.get(room.room_id, DEFAULT_MODEL)
|
||||||
|
text = "**Available models:**\n"
|
||||||
|
text += "\n".join(f"- `{n}` {'← current' if n == current else ''}" for n in names)
|
||||||
|
await self._send_text(room.room_id, text)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to list models")
|
||||||
|
await self._send_text(room.room_id, "Failed to fetch model list.")
|
||||||
|
|
||||||
|
elif cmd.startswith("set-model "):
|
||||||
|
model = cmd[10:].strip()
|
||||||
|
if not model:
|
||||||
|
await self._send_text(room.room_id, "Usage: `!ai set-model <model-name>`")
|
||||||
|
return
|
||||||
|
self.room_models[room.room_id] = model
|
||||||
|
# Persist in room state for cross-restart persistence
|
||||||
|
try:
|
||||||
|
await self.client.room_put_state(
|
||||||
|
room.room_id, MODEL_STATE_TYPE, {"model": model}, state_key="",
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Could not persist model to room state", exc_info=True)
|
||||||
|
await self._send_text(room.room_id, f"Model set to `{model}` for this room.")
|
||||||
|
|
||||||
|
elif cmd.startswith("search "):
|
||||||
|
query = cmd[7:].strip()
|
||||||
|
if not query:
|
||||||
|
await self._send_text(room.room_id, "Usage: `!ai search <query>`")
|
||||||
|
return
|
||||||
|
results = await self.rag.search(query, top_k=5)
|
||||||
|
if not results:
|
||||||
|
await self._send_text(room.room_id, "No documents found.")
|
||||||
|
return
|
||||||
|
await self._send_text(room.room_id, self.rag.format_context(results))
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Treat unknown commands as AI prompts
|
||||||
|
if self.llm:
|
||||||
|
await self.client.room_typing(room.room_id, typing_state=True)
|
||||||
|
try:
|
||||||
|
await self._respond_with_ai(room, cmd)
|
||||||
|
finally:
|
||||||
|
await self.client.room_typing(room.room_id, typing_state=False)
|
||||||
|
else:
|
||||||
|
await self._send_text(room.room_id, f"Unknown command: `{cmd}`\n\n{HELP_TEXT}")
|
||||||
|
|
||||||
|
async def _respond_with_ai(self, room, user_message: str):
|
||||||
|
model = self.room_models.get(room.room_id, DEFAULT_MODEL)
|
||||||
|
|
||||||
|
# Build conversation context from room timeline
|
||||||
|
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
|
||||||
|
|
||||||
|
# WildFiles document context
|
||||||
|
doc_results = await self.rag.search(user_message)
|
||||||
|
doc_context = self.rag.format_context(doc_results)
|
||||||
|
if doc_context:
|
||||||
|
messages.append({"role": "system", "content": doc_context})
|
||||||
|
|
||||||
|
# Last N messages from room timeline as context
|
||||||
|
timeline = room.timeline
|
||||||
|
history = list(timeline)[-10:] if timeline else []
|
||||||
|
for evt in history:
|
||||||
|
if not hasattr(evt, "body"):
|
||||||
|
continue
|
||||||
|
role = "assistant" if evt.sender == BOT_USER else "user"
|
||||||
|
messages.append({"role": role, "content": evt.body})
|
||||||
|
|
||||||
|
# Ensure last message is the current user message
|
||||||
|
if not messages or messages[-1].get("content") != user_message:
|
||||||
|
messages.append({"role": "user", "content": user_message})
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = await self.llm.chat.completions.create(
|
||||||
|
model=model,
|
||||||
|
messages=messages,
|
||||||
|
max_tokens=2048,
|
||||||
|
)
|
||||||
|
reply = resp.choices[0].message.content
|
||||||
|
await self._send_text(room.room_id, reply)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("LLM call failed")
|
||||||
|
await self._send_text(room.room_id, "Sorry, I couldn't generate a response.")
|
||||||
|
|
||||||
|
async def _send_text(self, room_id: str, text: str):
|
||||||
|
await self.client.room_send(
|
||||||
|
room_id,
|
||||||
|
message_type="m.room.message",
|
||||||
|
content={
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": text,
|
||||||
|
"format": "org.matrix.custom.html",
|
||||||
|
"formatted_body": text,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
async def on_megolm(self, room, event: MegolmEvent):
|
async def on_megolm(self, room, event: MegolmEvent):
|
||||||
"""Log undecryptable messages."""
|
"""Log undecryptable messages."""
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
|||||||
@@ -11,6 +11,12 @@ services:
|
|||||||
command: python bot.py
|
command: python bot.py
|
||||||
env_file: .env
|
env_file: .env
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
environment:
|
||||||
|
- LITELLM_BASE_URL
|
||||||
|
- LITELLM_API_KEY
|
||||||
|
- DEFAULT_MODEL
|
||||||
|
- WILDFILES_BASE_URL
|
||||||
|
- WILDFILES_ORG
|
||||||
volumes:
|
volumes:
|
||||||
- bot-crypto:/data/crypto_store
|
- bot-crypto:/data/crypto_store
|
||||||
|
|
||||||
|
|||||||
@@ -6,3 +6,5 @@ livekit>=1.0,<2.0
|
|||||||
livekit-api>=1.0,<2.0
|
livekit-api>=1.0,<2.0
|
||||||
matrix-nio[e2e]>=0.25,<1.0
|
matrix-nio[e2e]>=0.25,<1.0
|
||||||
canonicaljson>=2.0,<3.0
|
canonicaljson>=2.0,<3.0
|
||||||
|
httpx>=0.27,<1.0
|
||||||
|
openai>=1.0,<2.0
|
||||||
|
|||||||
Reference in New Issue
Block a user