import os import json import asyncio import base64 import hashlib import io import logging import re import time import uuid import sentry_sdk import docx import fitz # pymupdf import httpx from openai import AsyncOpenAI from olm import sas as olm_sas import olm.pk import canonicaljson from rag_key_manager import RAGKeyManager from nio import ( AsyncClient, AsyncClientConfig, LoginResponse, InviteMemberEvent, MegolmEvent, ReactionEvent, RoomEncryptedFile, RoomEncryptedImage, RoomMessageFile, RoomMessageImage, RoomMessageText, RoomMessageUnknown, SyncResponse, UnknownEvent, KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac, ToDeviceError, ) from nio.crypto.attachments import decrypt_attachment from livekit import api, rtc from voice import VoiceSession from article_summary import ArticleSummaryHandler from cron import CronScheduler BOT_DEVICE_ID = "AIBOT" CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member" ENCRYPTION_KEYS_TYPE = "io.element.call.encryption_keys" MODEL_STATE_TYPE = "ai.agiliton.model" RENAME_STATE_TYPE = "ai.agiliton.auto_rename" logger = logging.getLogger("matrix-ai-bot") # Sentry error tracking _sentry_dsn = os.environ.get("SENTRY_DSN", "") if _sentry_dsn: sentry_sdk.init(dsn=_sentry_dsn, traces_sample_rate=0.1, environment=os.environ.get("SENTRY_ENV", "production")) logger.info("Sentry initialized for bot") HOMESERVER = os.environ["MATRIX_HOMESERVER"] BOT_USER = os.environ["MATRIX_BOT_USER"] BOT_PASS = os.environ["MATRIX_BOT_PASSWORD"] LK_URL = os.environ["LIVEKIT_URL"] LK_KEY = os.environ["LIVEKIT_API_KEY"] LK_SECRET = os.environ["LIVEKIT_API_SECRET"] AGENT_NAME = os.environ.get("AGENT_NAME", "matrix-ai") STORE_PATH = os.environ.get("CRYPTO_STORE_PATH", "/data/crypto_store") CREDS_FILE = os.path.join(STORE_PATH, "credentials.json") LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "claude-sonnet") BASE_MODEL = os.environ.get("BASE_MODEL", "claude-haiku") ESCALATION_MODEL = os.environ.get("ESCALATION_MODEL", "claude-sonnet") MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service:8090") MEMORY_SERVICE_TOKEN = os.environ.get("MEMORY_SERVICE_TOKEN", "") CONFLUENCE_URL = os.environ.get("CONFLUENCE_BASE_URL", "") CONFLUENCE_USER = os.environ.get("CONFLUENCE_USER", "") CONFLUENCE_TOKEN = os.environ.get("CONFLUENCE_TOKEN", "") PORTAL_URL = os.environ.get("PORTAL_URL", "") BOT_API_KEY = os.environ.get("BOT_API_KEY", "") RAG_ENDPOINT = os.environ.get("RAG_ENDPOINT", "") # Customer-VM RAG service (e.g. http://127.0.0.1:8765) RAG_AUTH_TOKEN = os.environ.get("RAG_AUTH_TOKEN", "") # Bearer token for local RAG BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "") ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY", "") ELEVENLABS_VOICE_ID = os.environ.get("ELEVENLABS_VOICE_ID", "ML23UVoFL5mI6APbRAeR") FIRECRAWL_URL = os.environ.get("FIRECRAWL_URL", "") MAX_TOOL_ITERATIONS = 5 SYSTEM_PROMPT = """You are a helpful AI assistant in a Matrix chat room. Keep answers concise but thorough. Use markdown formatting when helpful. Always respond in the same language the user writes in. Never switch languages unless the user explicitly asks you to. If a user writes in German, reply in German. If they write in English, reply in English. Do not offer to translate or change language on your own. IMPORTANT RULES — FOLLOW THESE STRICTLY: - When document context is provided below, use it to answer. Always include any links. - NEVER tell the user to run commands or type anything special. No commands exist. - NEVER mention "!ai", "!ai search", "!ai read", or any slash/bang commands. - NEVER say you cannot access files, documents, or links. - NEVER ask the user where documents are stored, how they were uploaded, or under what filename. - NEVER suggest contacting an administrator, using a web interface, or checking another system. - NEVER ask follow-up questions about document storage or file locations. - If no relevant documents were found, simply say you don't have information on that topic and ask if you can help with something else. Do NOT speculate about why or suggest the user look elsewhere. - You can see and analyze images that users send. Describe what you see when asked about an image. - You can read and analyze PDF documents that users send. Summarize content and answer questions about them. - Always focus on the user's most recent message — whether it was text or voice. Do not automatically continue or summarize previous conversations. - When a user greets you or starts a new conversation after a pause, respond with a brief greeting and wait for their instructions. - You can generate images when asked — use the generate_image tool for any image creation, drawing, or illustration requests. - You can search the web using the web_search tool. Use it when users ask about current events, facts, or anything that needs up-to-date information. - You can open and read web pages using browse_url. Use it when a user shares a link, or when you need more detail from a search result. Summarize the key content concisely. - When you use web_search, embed source links INLINE in the text where the information appears, e.g. "Laut [Cyprus Mail](url) hat..." or "([Quelle](url))". Do NOT collect links in a separate section at the bottom. Every claim from a search result must have its source linked right there in the sentence. - ALWAYS use https:// for links, never http://. All links you generate must use HTTPS. - Keep formatting compact. STRICT rules: NEVER use headings (no #, ##, ###). Use **bold text** for section titles instead. Use --- sparingly to separate major sections. NEVER add blank lines between list items or between a section title and its content. Maximum one blank line between sections. - You can search Confluence and Jira using tools. When users ask about documentation, wiki pages, tickets, or tasks, use the appropriate tool. Use confluence_recent_pages FIRST to show recently edited pages before searching. - When creating Jira issues, always confirm the project key and summary with the user before creating. - If a user's Atlassian account is not connected, tell them to connect it at https://matrixhost.eu/settings and provide the link. - If user memories are provided, use them to personalize responses. Address users by name if known. - When asked to translate, provide ONLY the translation with no explanation. - You can set reminders and scheduled messages. When users ask to be reminded of something, use the schedule_message tool. Parse natural language times like "in 2 hours", "tomorrow at 9am", "every Monday" into ISO 8601 datetime with Europe/Berlin timezone (unless user specifies otherwise).""" IMAGE_GEN_TOOLS = [{ "type": "function", "function": { "name": "generate_image", "description": "Generate an image from a text description. Use when the user asks to create, draw, generate, design, or make an image/picture/photo/illustration.", "parameters": { "type": "object", "properties": { "prompt": {"type": "string", "description": "Detailed image generation prompt"} }, "required": ["prompt"] } } }] ATLASSIAN_TOOLS = [ { "type": "function", "function": { "name": "confluence_recent_pages", "description": "List recently modified Confluence pages. Use this FIRST when the user mentions Confluence, documents, or wiki pages — shows the last 5 recently edited pages so the user can pick one directly without searching.", "parameters": { "type": "object", "properties": { "limit": {"type": "integer", "description": "Max pages to return (default 5)", "default": 5}, }, }, }, }, { "type": "function", "function": { "name": "confluence_search", "description": "Search Confluence wiki pages using CQL (Confluence Query Language). Use when the user asks about documentation, wiki, or knowledge base content.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query text"}, "limit": {"type": "integer", "description": "Max results to return (default 5)", "default": 5}, }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "confluence_read_page", "description": "Read the full content of a Confluence page by its ID. Use after searching to get page details.", "parameters": { "type": "object", "properties": { "page_id": {"type": "string", "description": "The Confluence page ID"}, }, "required": ["page_id"], }, }, }, { "type": "function", "function": { "name": "confluence_update_page", "description": "Update a section of a Confluence page by heading. Use when the user asks to change, edit, or update part of a wiki page.", "parameters": { "type": "object", "properties": { "page_id": {"type": "string", "description": "The Confluence page ID"}, "section_heading": {"type": "string", "description": "The heading text of the section to update"}, "new_content": {"type": "string", "description": "New content for the section (paragraphs separated by newlines)"}, }, "required": ["page_id", "section_heading", "new_content"], }, }, }, { "type": "function", "function": { "name": "confluence_create_page", "description": "Create a new Confluence page. Use when the user asks to create a new wiki page or document.", "parameters": { "type": "object", "properties": { "title": {"type": "string", "description": "Page title"}, "content": {"type": "string", "description": "Page body text (paragraphs separated by newlines)"}, "space_key": {"type": "string", "description": "Confluence space key (default: AI)", "default": "AI"}, }, "required": ["title", "content"], }, }, }, { "type": "function", "function": { "name": "jira_search", "description": "Search Jira issues using JQL (Jira Query Language). Use when the user asks about tickets, tasks, bugs, or project issues.", "parameters": { "type": "object", "properties": { "jql": {"type": "string", "description": "JQL query string"}, "limit": {"type": "integer", "description": "Max results (default 10)", "default": 10}, }, "required": ["jql"], }, }, }, { "type": "function", "function": { "name": "jira_get_issue", "description": "Get detailed information about a specific Jira issue by its key (e.g. CF-123).", "parameters": { "type": "object", "properties": { "issue_key": {"type": "string", "description": "Jira issue key like CF-123"}, }, "required": ["issue_key"], }, }, }, { "type": "function", "function": { "name": "jira_create_issue", "description": "Create a new Jira issue. Always confirm project and summary with user before creating.", "parameters": { "type": "object", "properties": { "project": {"type": "string", "description": "Project key (e.g. CF)"}, "summary": {"type": "string", "description": "Issue title/summary"}, "issue_type": {"type": "string", "description": "Issue type (default: Task)", "default": "Task"}, "description": {"type": "string", "description": "Issue description (optional)"}, }, "required": ["project", "summary"], }, }, }, { "type": "function", "function": { "name": "jira_add_comment", "description": "Add a comment to an existing Jira issue.", "parameters": { "type": "object", "properties": { "issue_key": {"type": "string", "description": "Jira issue key like CF-123"}, "comment": {"type": "string", "description": "Comment text"}, }, "required": ["issue_key", "comment"], }, }, }, { "type": "function", "function": { "name": "jira_transition", "description": "Change the status of a Jira issue (e.g. move to 'In Progress', 'Done').", "parameters": { "type": "object", "properties": { "issue_key": {"type": "string", "description": "Jira issue key like CF-123"}, "status": {"type": "string", "description": "Target status name (e.g. 'In Progress', 'Done')"}, }, "required": ["issue_key", "status"], }, }, }, ] WEB_SEARCH_TOOLS = [{ "type": "function", "function": { "name": "web_search", "description": "Search the web for current information. Use when the user asks about recent events, facts, or anything that needs up-to-date information from the internet.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "count": {"type": "integer", "description": "Number of results (default 5)", "default": 5}, }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "browse_url", "description": "Open a web page and read its text content. Use when the user shares a URL, or when you need more detail from a search result link.", "parameters": { "type": "object", "properties": { "url": {"type": "string", "description": "Full URL to fetch (https://...)"}, }, "required": ["url"], }, }, }] ROOM_TOOLS = [{ "type": "function", "function": { "name": "search_room_history", "description": ( "Search the current chat room's message history for specific content. " "Use when the user asks about something said earlier in the conversation that is beyond " "the recent messages visible to you, e.g. 'what did I say about X last week', " "'find the message where I mentioned Y', 'what was the first thing I asked today'." ), "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Search term or phrase to look for in room messages"}, "limit": {"type": "integer", "description": "Max messages to scan (default 200, max 500)", "default": 200}, }, "required": ["query"], }, }, }] SCHEDULER_TOOLS = [ { "type": "function", "function": { "name": "schedule_message", "description": ( "Schedule a reminder or message to be sent at a future time. " "Use when the user says things like 'remind me', 'erinnere mich', " "'send a message at', 'every Monday at 8am', etc. " "Parse the user's natural language time into an ISO 8601 datetime. " "Default timezone is Europe/Berlin unless the user specifies otherwise." ), "parameters": { "type": "object", "properties": { "message": {"type": "string", "description": "The reminder text to send"}, "datetime_iso": {"type": "string", "description": "When to send, ISO 8601 format (e.g. 2026-03-10T09:00:00+01:00)"}, "repeat": { "type": "string", "description": "Repeat pattern: 'once' (default), 'daily', 'weekly', 'weekdays', 'monthly'", "enum": ["once", "daily", "weekly", "weekdays", "monthly"], }, }, "required": ["message", "datetime_iso"], }, }, }, { "type": "function", "function": { "name": "list_reminders", "description": "List all active/pending reminders for the user. Use when they ask 'what reminders do I have?' or 'show my reminders'.", "parameters": {"type": "object", "properties": {}}, }, }, { "type": "function", "function": { "name": "cancel_reminder", "description": "Cancel a scheduled reminder by its ID number. Use when the user says 'cancel reminder #3' or 'lösche Erinnerung 3'.", "parameters": { "type": "object", "properties": { "reminder_id": {"type": "integer", "description": "The reminder ID to cancel"}, }, "required": ["reminder_id"], }, }, }, ] ALL_TOOLS = IMAGE_GEN_TOOLS + WEB_SEARCH_TOOLS + ATLASSIAN_TOOLS + ROOM_TOOLS + SCHEDULER_TOOLS ATLASSIAN_NOT_CONNECTED_MSG = ( "Your Atlassian account is not connected. " "Please connect it at [matrixhost.eu/settings](https://matrixhost.eu/settings?connect=atlassian) " "to use Jira and Confluence features." ) HELP_TEXT = """**AI Bot** Just write naturally — I'll respond to all messages in DMs. In group chats, @mention me to get my attention. I can search your documents, translate messages, and analyze images. Manage settings at [matrixhost.eu/settings](https://matrixhost.eu/settings).""" class DocumentRAG: """Search documents via customer-VM RAG service or central portal fallback.""" def __init__(self, portal_url: str, bot_api_key: str, rag_endpoint: str = "", rag_auth_token: str = ""): self.portal_url = portal_url.rstrip("/") self.bot_api_key = bot_api_key self.rag_endpoint = rag_endpoint.rstrip("/") if rag_endpoint else "" self.rag_auth_token = rag_auth_token self.use_local_rag = bool(self.rag_endpoint) self.enabled = bool(self.rag_endpoint) or bool(portal_url and bot_api_key) async def search(self, query: str, top_k: int = 10, api_key: str | None = None, org_slug: str | None = None, matrix_user_id: str | None = None) -> list[dict]: if not self.enabled or not matrix_user_id: return [] # Prefer customer-VM RAG service (encrypted, local) if self.use_local_rag: return await self._search_local(query, top_k, matrix_user_id) # Fallback: central portal API (legacy, unencrypted) return await self._search_portal(query, top_k, matrix_user_id) async def _search_local(self, query: str, top_k: int, matrix_user_id: str | None = None) -> list[dict]: """Search via customer-VM RAG service (localhost).""" try: body = {"query": query, "limit": top_k} if matrix_user_id: body["owner_id"] = matrix_user_id headers: dict[str, str] = {"Content-Type": "application/json"} if self.rag_auth_token: headers["Authorization"] = f"Bearer {self.rag_auth_token}" async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.post( f"{self.rag_endpoint}/rag/search", json=body, headers=headers, ) resp.raise_for_status() return resp.json().get("results", []) except Exception: logger.debug("Local RAG search failed", exc_info=True) return [] async def _search_portal(self, query: str, top_k: int, matrix_user_id: str) -> list[dict]: """Search via central portal API (legacy fallback).""" try: body = {"query": query, "limit": top_k, "matrix_user_id": matrix_user_id} async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.post( f"{self.portal_url}/api/bot/documents/search", json=body, headers={"Authorization": f"Bearer {self.bot_api_key}"}, ) resp.raise_for_status() return resp.json().get("results", []) except Exception: logger.debug("Portal document search failed", exc_info=True) return [] async def health(self) -> dict | None: """Check local RAG service health.""" if not self.use_local_rag: return None try: headers: dict[str, str] = {} if self.rag_auth_token: headers["Authorization"] = f"Bearer {self.rag_auth_token}" async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get(f"{self.rag_endpoint}/health", headers=headers) resp.raise_for_status() return resp.json() except Exception: return None def format_context(self, results: list[dict]) -> str: if not results: return "" parts = ["The following documents were found in our document archive:\n"] for i, r in enumerate(results, 1): title = r.get("title", r.get("filename", "Untitled")) link = r.get("source_url") or r.get("metadata", {}).get("source_url", "") category = r.get("category", "") date = r.get("detected_date", "") content = r.get("content", "") summary = r.get("metadata", {}).get("summary", "") parts.append(f"--- Document {i}: {title} ---") if category: parts.append(f"Category: {category}") if date: parts.append(f"Date: {date}") if link: parts.append(f"Link: {link}") if summary: parts.append(f"Summary: {summary}") if content: parts.append(f"Content:\n{content}") parts.append("") # blank line between docs parts.append("IMPORTANT INSTRUCTIONS FOR DOCUMENT RESPONSES:\n" "1. Answer the user's question using ALL the documents above.\n" "2. These are FRESH search results — they override anything from chat history.\n" " If previous messages said 'only one passport' but documents show more, trust the documents.\n" "3. TRUST the document TITLE and SUMMARY — they are AI-generated and accurate.\n" " The Content field may be garbled OCR from scanned PDFs (random characters, broken text).\n" " If the title says 'Christian's Passport' and summary says 'passport belonging to Christian',\n" " then it IS a passport — even if the content looks like gibberish.\n" "4. You MUST include a source link for EVERY document you reference.\n" "5. Format links as markdown: [Document Title](url)\n" "6. Place the link right after mentioning or quoting the document.\n" "7. If a document has no link, skip the link but still reference the title.\n" "8. Never show raw URLs without markdown formatting.\n" "9. List ALL matching documents, not just the first one.") return "\n".join(parts) class MemoryClient: """Async HTTP client for the memory-service.""" def __init__(self, base_url: str, token: str = ""): self.base_url = base_url.rstrip("/") self.token = token self.enabled = bool(base_url) def _headers(self) -> dict: h = {} if self.token: h["Authorization"] = f"Bearer {self.token}" return h async def store(self, user_id: str, fact: str, source_room: str = ""): if not self.enabled: return try: async with httpx.AsyncClient(timeout=10.0) as client: await client.post( f"{self.base_url}/memories/store", json={"user_id": user_id, "fact": fact, "source_room": source_room}, headers=self._headers(), ) except Exception: logger.warning("Memory store failed", exc_info=True) async def query(self, user_id: str, query: str, top_k: int = 10) -> list[dict]: if not self.enabled: return [] try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.base_url}/memories/query", json={"user_id": user_id, "query": query, "top_k": top_k}, headers=self._headers(), ) resp.raise_for_status() return resp.json().get("results", []) except Exception: logger.warning("Memory query failed", exc_info=True) return [] async def delete_user(self, user_id: str) -> int: if not self.enabled: return 0 try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.delete( f"{self.base_url}/memories/{user_id}", headers=self._headers(), ) resp.raise_for_status() return resp.json().get("deleted", 0) except Exception: logger.warning("Memory delete failed", exc_info=True) return 0 async def list_all(self, user_id: str) -> list[dict]: if not self.enabled: return [] try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( f"{self.base_url}/memories/{user_id}", headers=self._headers(), ) resp.raise_for_status() return resp.json().get("memories", []) except Exception: logger.warning("Memory list failed", exc_info=True) return [] async def store_chunk(self, user_id: str, room_id: str, chunk_text: str, summary: str, source_event_id: str = "", original_ts: float = 0.0): if not self.enabled: return try: async with httpx.AsyncClient(timeout=15.0) as client: await client.post( f"{self.base_url}/chunks/store", json={ "user_id": user_id, "room_id": room_id, "chunk_text": chunk_text, "summary": summary, "source_event_id": source_event_id, "original_ts": original_ts, }, headers=self._headers(), ) except Exception: logger.warning("Chunk store failed", exc_info=True) async def query_chunks(self, query: str, user_id: str, room_id: str = "", top_k: int = 5) -> list[dict]: if not self.enabled: return [] if not user_id: logger.error("query_chunks called with empty user_id — returning empty") return [] try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.base_url}/chunks/query", json={"user_id": user_id, "room_id": room_id, "query": query, "top_k": top_k}, headers=self._headers(), ) resp.raise_for_status() return resp.json().get("results", []) except Exception: logger.warning("Chunk query failed", exc_info=True) return [] async def create_scheduled(self, user_id: str, room_id: str, message_text: str, scheduled_at: float, repeat: str = "once") -> dict: if not self.enabled: return {"error": "Memory service not configured"} try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.base_url}/scheduled/create", json={ "user_id": user_id, "room_id": room_id, "message_text": message_text, "scheduled_at": scheduled_at, "repeat_pattern": repeat, }, headers=self._headers(), ) resp.raise_for_status() return resp.json() except httpx.HTTPStatusError as e: detail = e.response.json().get("detail", str(e)) if e.response else str(e) logger.warning("Schedule create failed: %s", detail) return {"error": detail} except Exception: logger.warning("Schedule create failed", exc_info=True) return {"error": "Failed to create reminder"} async def list_scheduled(self, user_id: str) -> list[dict]: if not self.enabled: return [] try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( f"{self.base_url}/scheduled/{user_id}", headers=self._headers(), ) resp.raise_for_status() return resp.json().get("reminders", []) except Exception: logger.warning("Schedule list failed", exc_info=True) return [] async def cancel_scheduled(self, user_id: str, reminder_id: int) -> dict: if not self.enabled: return {"error": "Memory service not configured"} try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.delete( f"{self.base_url}/scheduled/{user_id}/{reminder_id}", headers=self._headers(), ) resp.raise_for_status() return resp.json() except httpx.HTTPStatusError as e: detail = e.response.json().get("detail", str(e)) if e.response else str(e) return {"error": detail} except Exception: logger.warning("Schedule cancel failed", exc_info=True) return {"error": "Failed to cancel reminder"} async def get_due_messages(self) -> list[dict]: if not self.enabled: return [] try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.base_url}/scheduled/due", headers=self._headers(), ) resp.raise_for_status() return resp.json().get("due", []) except Exception: logger.warning("Get due messages failed", exc_info=True) return [] async def mark_sent(self, reminder_id: int) -> dict: if not self.enabled: return {} try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.base_url}/scheduled/{reminder_id}/mark-sent", headers=self._headers(), ) resp.raise_for_status() return resp.json() except Exception: logger.warning("Mark sent failed for #%d", reminder_id, exc_info=True) return {} class AtlassianClient: """Fetches per-user Atlassian tokens from the portal and calls Atlassian REST APIs.""" def __init__(self, portal_url: str, bot_api_key: str): self.portal_url = portal_url.rstrip("/") self.bot_api_key = bot_api_key self.enabled = bool(portal_url and bot_api_key) # Cache cloud IDs per user token to avoid repeated lookups self._cloud_id_cache: dict[str, str] = {} async def get_token(self, matrix_user_id: str) -> str | None: """Fetch user's Atlassian token from the portal.""" if not self.enabled: return None try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( f"{self.portal_url}/api/bot/tokens", params={"matrix_user_id": matrix_user_id, "provider": "atlassian"}, headers={"Authorization": f"Bearer {self.bot_api_key}"}, ) resp.raise_for_status() data = resp.json() return data.get("access_token") if data.get("connected") else None except Exception: logger.warning("Failed to fetch Atlassian token for %s", matrix_user_id, exc_info=True) return None async def _get_cloud_id(self, token: str) -> str | None: """Get the Atlassian Cloud ID for the user's instance.""" if token in self._cloud_id_cache: return self._cloud_id_cache[token] try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( "https://api.atlassian.com/oauth/token/accessible-resources", headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() resources = resp.json() if resources: cloud_id = resources[0]["id"] self._cloud_id_cache[token] = cloud_id return cloud_id except Exception: logger.warning("Failed to fetch Atlassian cloud ID", exc_info=True) return None async def confluence_recent_pages(self, token: str, limit: int = 5) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/rest/api/content/search", params={"cql": "type=page ORDER BY lastmodified DESC", "limit": str(limit)}, headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() data = resp.json() results = data.get("results", []) if not results: return "No recent Confluence pages found." lines = ["Recently modified pages:"] for i, r in enumerate(results, 1): title = r.get("title", "Untitled") page_id = r.get("id", "") space = r.get("_expandable", {}).get("space", "").split("/")[-1] lines.append(f"{i}. **{title}** (ID: {page_id}, Space: {space})") return "\n".join(lines) except Exception as e: return f"Failed to fetch recent pages: {e}" async def confluence_search(self, token: str, query: str, limit: int = 5) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/rest/api/content/search", params={"cql": f'text ~ "{query}"', "limit": str(limit), "expand": "metadata.labels"}, headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() data = resp.json() results = data.get("results", []) if not results: return f"No Confluence pages found for: {query}" lines = [] for r in results: title = r.get("title", "Untitled") page_id = r.get("id", "") space = r.get("_expandable", {}).get("space", "").split("/")[-1] url = f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki{r.get('_links', {}).get('webui', '')}" lines.append(f"- **{title}** (ID: {page_id}, Space: {space})\n {url}") return f"Found {len(results)} pages:\n" + "\n".join(lines) except Exception as e: return f"Confluence search error: {e}" async def confluence_read_page(self, token: str, page_id: str) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/api/v2/pages/{page_id}", params={"body-format": "storage"}, headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() data = resp.json() title = data.get("title", "Untitled") html_body = data.get("body", {}).get("storage", {}).get("value", "") # Strip HTML tags for a readable summary from bs4 import BeautifulSoup text = BeautifulSoup(html_body, "lxml").get_text(separator="\n", strip=True) # Truncate for LLM context if len(text) > 8000: text = text[:8000] + "\n...(truncated)" return f"**{title}**\n\n{text}" except Exception as e: return f"Failed to read Confluence page {page_id}: {e}" async def confluence_update_page(self, token: str, page_id: str, section_heading: str, new_content: str) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: base_url = f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki" headers = {"Authorization": f"Bearer {token}"} async with httpx.AsyncClient(timeout=15.0) as client: # Fetch current page resp = await client.get( f"{base_url}/rest/api/content/{page_id}", params={"expand": "body.storage,version"}, headers=headers, ) resp.raise_for_status() page = resp.json() title = page["title"] version = page["version"]["number"] body_html = page["body"]["storage"]["value"] # Find and replace section from confluence_collab.parser import parse_sections, find_section, replace_section_content sections = parse_sections(body_html) section = find_section(sections, section_heading) if section is None: return f"Section '{section_heading}' not found on page '{title}'" paragraphs = [p.strip() for p in new_content.split("\n") if p.strip()] new_html = "".join(f"
{p}
" for p in paragraphs) if paragraphs else f"{new_content}
" new_body = replace_section_content(body_html, section, new_html) # Update page resp = await client.put( f"{base_url}/rest/api/content/{page_id}", json={ "version": {"number": version + 1}, "title": title, "type": "page", "body": {"storage": {"value": new_body, "representation": "storage"}}, }, headers=headers, ) resp.raise_for_status() return f"Section '{section_heading}' updated successfully on '{title}'" except Exception as e: return f"Failed to update Confluence page: {e}" async def confluence_create_page(self, token: str, title: str, content: str, space_key: str = "AI") -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: headers = {"Authorization": f"Bearer {token}"} async with httpx.AsyncClient(timeout=15.0) as client: # Resolve space key to space ID (v2 API requires ID) space_resp = await client.get( f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/api/v2/spaces", params={"keys": space_key}, headers=headers, ) space_resp.raise_for_status() spaces = space_resp.json().get("results", []) if not spaces: return f"Space '{space_key}' not found." space_id = spaces[0]["id"] paragraphs = [p.strip() for p in content.split("\n") if p.strip()] body_html = "".join(f"{p}
" for p in paragraphs) if paragraphs else f"{content}
" resp = await client.post( f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/api/v2/pages", json={ "spaceId": space_id, "status": "current", "title": title, "body": {"representation": "storage", "value": body_html}, }, headers=headers, ) if resp.status_code == 401: logger.warning("Confluence v2 create returned 401, trying v1 API") # Fallback to v1 API which may accept classic scopes resp_v1 = await client.post( f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/rest/api/content", json={ "type": "page", "title": title, "space": {"key": space_key}, "body": {"storage": {"value": body_html, "representation": "storage"}}, }, headers=headers, ) if resp_v1.status_code in (200, 201): data = resp_v1.json() page_id = data["id"] return f"Page created: **{title}** (ID: {page_id})" logger.error("Confluence v1 fallback also failed: %s %s", resp_v1.status_code, resp_v1.text[:500]) return f"Failed to create page: Unauthorized. Please re-connect Atlassian at [matrixhost.eu/settings](https://matrixhost.eu/settings?connect=atlassian) to grant write permissions." resp.raise_for_status() data = resp.json() page_id = data["id"] return f"Page created: **{title}** (ID: {page_id})" except Exception as e: return f"Failed to create Confluence page: {e}" async def jira_search(self, token: str, jql: str, limit: int = 10) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/search/jql", params={"jql": jql, "maxResults": str(limit), "fields": "summary,status,assignee,priority,created,updated"}, headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() data = resp.json() issues = data.get("issues", []) if not issues: return f"No Jira issues found for: {jql}" lines = [] for issue in issues: key = issue["key"] fields = issue.get("fields", {}) summary = fields.get("summary", "") status = fields.get("status", {}).get("name", "") assignee = fields.get("assignee", {}) assignee_name = assignee.get("displayName", "Unassigned") if assignee else "Unassigned" priority = fields.get("priority", {}).get("name", "") if fields.get("priority") else "" lines.append(f"- **{key}**: {summary} [{status}] (Assignee: {assignee_name}, Priority: {priority})") return f"Found {data.get('total', len(issues))} issues:\n" + "\n".join(lines) except Exception as e: return f"Jira search error: {e}" async def jira_get_issue(self, token: str, issue_key: str) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}", params={"fields": "summary,status,assignee,priority,description,created,updated,comment"}, headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() data = resp.json() fields = data.get("fields", {}) summary = fields.get("summary", "") status = fields.get("status", {}).get("name", "") assignee = fields.get("assignee", {}) assignee_name = assignee.get("displayName", "Unassigned") if assignee else "Unassigned" priority = fields.get("priority", {}).get("name", "") if fields.get("priority") else "" desc = fields.get("description") # ADF description — extract text nodes desc_text = "" if desc and isinstance(desc, dict): desc_text = self._extract_adf_text(desc) elif isinstance(desc, str): desc_text = desc if len(desc_text) > 4000: desc_text = desc_text[:4000] + "...(truncated)" comments = fields.get("comment", {}).get("comments", []) comment_lines = [] for c in comments[-5:]: # Last 5 comments author = c.get("author", {}).get("displayName", "Unknown") body = self._extract_adf_text(c.get("body", {})) if isinstance(c.get("body"), dict) else str(c.get("body", "")) comment_lines.append(f" - **{author}**: {body[:500]}") result = f"**{issue_key}: {summary}**\nStatus: {status} | Assignee: {assignee_name} | Priority: {priority}" if desc_text: result += f"\n\nDescription:\n{desc_text}" if comment_lines: result += f"\n\nRecent comments:\n" + "\n".join(comment_lines) return result except Exception as e: return f"Failed to fetch {issue_key}: {e}" async def jira_create_issue(self, token: str, project: str, summary: str, issue_type: str = "Task", description: str = "") -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: body: dict = { "fields": { "project": {"key": project}, "summary": summary, "issuetype": {"name": issue_type}, } } if description: body["fields"]["description"] = { "type": "doc", "version": 1, "content": [{"type": "paragraph", "content": [{"type": "text", "text": description}]}], } async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.post( f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue", json=body, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, ) resp.raise_for_status() data = resp.json() return f"Created **{data['key']}**: {summary}" except httpx.HTTPStatusError as e: return f"Failed to create issue: {e.response.text}" except Exception as e: return f"Failed to create issue: {e}" async def jira_add_comment(self, token: str, issue_key: str, comment: str) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: body = { "body": { "type": "doc", "version": 1, "content": [{"type": "paragraph", "content": [{"type": "text", "text": comment}]}], } } async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.post( f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}/comment", json=body, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, ) resp.raise_for_status() return f"Comment added to {issue_key}." except Exception as e: return f"Failed to add comment to {issue_key}: {e}" async def jira_transition(self, token: str, issue_key: str, status: str) -> str: cloud_id = await self._get_cloud_id(token) if not cloud_id: return "Error: Could not determine Atlassian Cloud instance." try: async with httpx.AsyncClient(timeout=15.0) as client: # First, get available transitions resp = await client.get( f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}/transitions", headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() transitions = resp.json().get("transitions", []) # Find matching transition (case-insensitive) target = None for t in transitions: if t["name"].lower() == status.lower() or t["to"]["name"].lower() == status.lower(): target = t break if not target: available = ", ".join(t["name"] for t in transitions) return f"Cannot transition {issue_key} to '{status}'. Available transitions: {available}" # Execute transition resp = await client.post( f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}/transitions", json={"transition": {"id": target["id"]}}, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, ) resp.raise_for_status() return f"Transitioned {issue_key} to **{target['to']['name']}**." except Exception as e: return f"Failed to transition {issue_key}: {e}" @staticmethod def _extract_adf_text(adf: dict) -> str: """Recursively extract text from Atlassian Document Format.""" if not isinstance(adf, dict): return str(adf) parts = [] if adf.get("type") == "text": parts.append(adf.get("text", "")) for child in adf.get("content", []): parts.append(AtlassianClient._extract_adf_text(child)) return " ".join(parts).strip() class Bot: def __init__(self): config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) self.client = AsyncClient( HOMESERVER, BOT_USER, store_path=STORE_PATH, config=config, ) self.lkapi = None self.voice_sessions: dict[str, VoiceSession] = {} self.active_calls = set() # rooms where we've sent call member event self.active_callers: dict[str, set[str]] = {} # room_id → set of caller user IDs self.rag = DocumentRAG(PORTAL_URL, BOT_API_KEY, rag_endpoint=RAG_ENDPOINT, rag_auth_token=RAG_AUTH_TOKEN) self.key_manager = RAGKeyManager(self.client, PORTAL_URL, BOT_API_KEY) self.memory = MemoryClient(MEMORY_SERVICE_URL, token=MEMORY_SERVICE_TOKEN) self.atlassian = AtlassianClient(PORTAL_URL, BOT_API_KEY) self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None self._documents_cache: dict[str, str | None] = {} # matrix_user_id -> connected status self.room_models: dict[str, str] = {} # room_id -> model name self.auto_rename_rooms: set[str] = set() # rooms with auto-rename enabled self._recent_images: dict[str, tuple[str, str, float]] = {} # room_id -> (b64, mime, timestamp) self.renamed_rooms: dict[str, float] = {} # room_id -> timestamp of last rename self._loaded_rooms: set[str] = set() # rooms where we've loaded state self._sync_token_received = False self._verifications: dict[str, dict] = {} # txn_id -> verification state self._room_document_context: dict[str, list[dict]] = {} # room_id -> [{type, filename, text, timestamp}, ...] # Article summary handler (Blinkist-style audio summaries) if self.llm and ELEVENLABS_API_KEY: self.article_handler = ArticleSummaryHandler( llm_client=self.llm, model=DEFAULT_MODEL, elevenlabs_key=ELEVENLABS_API_KEY, voice_id=ELEVENLABS_VOICE_ID, firecrawl_url=FIRECRAWL_URL or None, ) else: self.article_handler = None # Cron job scheduler (syncs with matrixhost portal) if PORTAL_URL and BOT_API_KEY: self.cron_scheduler = CronScheduler( portal_url=PORTAL_URL, api_key=BOT_API_KEY, matrix_client=self.client, send_text_fn=self._send_text, llm_client=self.llm, default_model=DEFAULT_MODEL, escalation_model=ESCALATION_MODEL, ) else: self.cron_scheduler = None # Pipeline approval tracking: event_id -> execution_id self._pipeline_approval_events: dict[str, str] = {} if self.cron_scheduler: self.cron_scheduler.pipeline_engine.on_approval_registered = self._on_pipeline_approval_registered async def _has_documents(self, matrix_user_id: str) -> bool: """Check if user has documents via local RAG or MatrixHost portal API. Results are cached per session. """ if matrix_user_id in self._documents_cache: return self._documents_cache[matrix_user_id] is not None # Check local RAG service first (customer-VM encrypted RAG) if self.rag.use_local_rag: health = await self.rag.health() if health and health.get("document_count", 0) > 0: self._documents_cache[matrix_user_id] = "connected" return True # Fallback: check via central portal if self.atlassian.enabled: try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( f"{self.atlassian.portal_url}/api/bot/tokens", params={"matrix_user_id": matrix_user_id, "provider": "documents"}, headers={"Authorization": f"Bearer {self.atlassian.bot_api_key}"}, ) resp.raise_for_status() data = resp.json() if data.get("connected"): self._documents_cache[matrix_user_id] = "connected" return True except Exception: logger.debug("Portal document check failed for %s", matrix_user_id, exc_info=True) self._documents_cache[matrix_user_id] = None return False async def start(self): # Restore existing session or create new one if os.path.exists(CREDS_FILE): with open(CREDS_FILE) as f: creds = json.load(f) self.client.restore_login( user_id=creds["user_id"], device_id=creds["device_id"], access_token=creds["access_token"], ) self.client.load_store() logger.info("Restored session as %s (device %s)", creds["user_id"], creds["device_id"]) else: resp = await self.client.login(BOT_PASS, device_name="ai-voice-bot") if not isinstance(resp, LoginResponse): logger.error("Login failed: %s", resp) return # Persist credentials for next restart with open(CREDS_FILE, "w") as f: json.dump({ "user_id": resp.user_id, "device_id": resp.device_id, "access_token": resp.access_token, }, f) logger.info("Logged in as %s (device %s) — credentials saved", resp.user_id, resp.device_id) if self.client.should_upload_keys: await self.client.keys_upload() # Bootstrap cross-signing if not already done await self._ensure_cross_signing() self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET) self.client.add_event_callback(self.on_invite, InviteMemberEvent) self.client.add_event_callback(self.on_megolm, MegolmEvent) self.client.add_event_callback(self.on_unknown, UnknownEvent) self.client.add_event_callback(self.on_text_message, RoomMessageText) self.client.add_event_callback(self.on_image_message, RoomMessageImage) self.client.add_event_callback(self.on_encrypted_image_message, RoomEncryptedImage) self.client.add_event_callback(self.on_file_message, RoomMessageFile) self.client.add_event_callback(self.on_encrypted_file_message, RoomEncryptedFile) self.client.add_event_callback(self.on_room_unknown, RoomMessageUnknown) self.client.add_event_callback(self.on_reaction, ReactionEvent) self.client.add_response_callback(self.on_sync, SyncResponse) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationMac) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationCancel) # Start reminder scheduler asyncio.create_task(self._reminder_scheduler()) # Start cron job scheduler if self.cron_scheduler: asyncio.create_task(self.cron_scheduler.start()) logger.info("Cron scheduler task created") await self.client.sync_forever(timeout=30000, full_state=True) async def _ensure_cross_signing(self): """Ensure bot device is cross-signed so Element clients don't show authenticity warnings.""" xsign_file = os.path.join(STORE_PATH, "cross_signing_keys.json") with open(CREDS_FILE) as f: creds = json.load(f) user_id = creds["user_id"] device_id = creds["device_id"] token = creds["access_token"] headers = {"Authorization": f"Bearer {token}"} # Check if device already has a cross-signing signature on the server try: async with httpx.AsyncClient(timeout=15.0) as hc: resp = await hc.post( f"{HOMESERVER}/_matrix/client/v3/keys/query", json={"device_keys": {user_id: [device_id]}}, headers=headers, ) if resp.status_code == 200: device_keys = resp.json().get("device_keys", {}).get(user_id, {}).get(device_id, {}) sigs = device_keys.get("signatures", {}).get(user_id, {}) device_key_id = f"ed25519:{device_id}" has_cross_sig = any(k != device_key_id for k in sigs) if has_cross_sig: logger.info("Device %s already cross-signed, skipping bootstrap", device_id) return except Exception as e: logger.warning("Cross-signing check failed: %s", e) # Load existing seeds or generate new ones if os.path.exists(xsign_file): with open(xsign_file) as f: seeds = json.load(f) master_seed = base64.b64decode(seeds["master_seed"]) ss_seed = base64.b64decode(seeds["self_signing_seed"]) us_seed = base64.b64decode(seeds["user_signing_seed"]) logger.info("Loaded existing cross-signing seeds, re-signing device") else: master_seed = os.urandom(32) ss_seed = os.urandom(32) us_seed = os.urandom(32) logger.info("Generating new cross-signing keys") master_key = olm.pk.PkSigning(master_seed) self_signing_key = olm.pk.PkSigning(ss_seed) user_signing_key = olm.pk.PkSigning(us_seed) def make_key(usage, pubkey): return { "user_id": user_id, "usage": [usage], "keys": {"ed25519:" + pubkey: pubkey}, } master_obj = make_key("master", master_key.public_key) ss_obj = make_key("self_signing", self_signing_key.public_key) us_obj = make_key("user_signing", user_signing_key.public_key) # Sign sub-keys with master key ss_canonical = canonicaljson.encode_canonical_json(ss_obj) ss_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(ss_canonical)}} us_canonical = canonicaljson.encode_canonical_json(us_obj) us_obj["signatures"] = {user_id: {"ed25519:" + master_key.public_key: master_key.sign(us_canonical)}} try: async with httpx.AsyncClient(timeout=15.0) as hc: # Upload cross-signing keys with password auth resp = await hc.post( f"{HOMESERVER}/_matrix/client/v3/keys/device_signing/upload", json={ "master_key": master_obj, "self_signing_key": ss_obj, "user_signing_key": us_obj, "auth": { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": user_id}, "password": BOT_PASS, }, }, headers=headers, ) if resp.status_code != 200: logger.error("Cross-signing upload failed: %d %s", resp.status_code, resp.text) return logger.info("Cross-signing keys uploaded") # Fetch device keys to sign qresp = await hc.post( f"{HOMESERVER}/_matrix/client/v3/keys/query", json={"device_keys": {user_id: [device_id]}}, headers=headers, ) device_obj = qresp.json()["device_keys"][user_id][device_id] device_obj.pop("signatures", None) device_obj.pop("unsigned", None) # Sign device with self-signing key dk_canonical = canonicaljson.encode_canonical_json(device_obj) dk_sig = self_signing_key.sign(dk_canonical) device_obj["signatures"] = {user_id: {"ed25519:" + self_signing_key.public_key: dk_sig}} resp2 = await hc.post( f"{HOMESERVER}/_matrix/client/v3/keys/signatures/upload", json={user_id: {device_id: device_obj}}, headers=headers, ) if resp2.status_code != 200: logger.error("Device signature upload failed: %d %s", resp2.status_code, resp2.text) return logger.info("Device %s cross-signed successfully", device_id) except Exception as e: logger.error("Cross-signing bootstrap failed: %s", e, exc_info=True) return # Save seeds if new if not os.path.exists(xsign_file): with open(xsign_file, "w") as f: json.dump({ "master_seed": base64.b64encode(master_seed).decode(), "self_signing_seed": base64.b64encode(ss_seed).decode(), "user_signing_seed": base64.b64encode(us_seed).decode(), }, f) logger.info("Cross-signing seeds saved to %s", xsign_file) async def _inject_rag_key(self): """Load document encryption key from Matrix and inject into RAG service.""" try: seed_key = os.environ.get("RAG_ENCRYPTION_KEY_SEED") success = await self.key_manager.ensure_rag_key(seed_key_hex=seed_key) if success: logger.info("RAG encryption key loaded from Matrix E2EE") if seed_key: logger.info("Migration complete - RAG_ENCRYPTION_KEY_SEED can now be removed from env") else: logger.warning("Failed to load RAG encryption key - documents will be inaccessible") except Exception as e: logger.error("RAG key injection failed: %s", e, exc_info=True) async def _reminder_scheduler(self): """Background loop: check for due reminders every 30 seconds, send them.""" await asyncio.sleep(10) # Wait for bot to fully initialize logger.info("Reminder scheduler started") while True: try: due = await self.memory.get_due_messages() for msg in due: try: reminder_text = f"\u23f0 **Erinnerung:** {msg['message_text']}" await self._send_text(msg["room_id"], reminder_text) await self.memory.mark_sent(msg["id"]) logger.info("Sent reminder #%d to %s", msg["id"], msg["room_id"]) except Exception: logger.warning("Failed to send reminder #%d", msg["id"], exc_info=True) except Exception: logger.warning("Reminder scheduler check failed", exc_info=True) await asyncio.sleep(30) async def on_invite(self, room, event: InviteMemberEvent): if event.state_key != BOT_USER: return logger.info("Invited to %s, joining room", room.room_id) await self.client.join(room.room_id) async def on_sync(self, response: SyncResponse): """After each sync, trust all devices in our rooms.""" if not self._sync_token_received: self._sync_token_received = True logger.info("Initial sync complete, text handler active") # Inject RAG encryption key from Matrix E2EE room asyncio.create_task(self._inject_rag_key()) for user_id in list(self.client.device_store.users): for device in self.client.device_store.active_user_devices(user_id): if not device.verified: self.client.verify_device(device) logger.info("Auto-trusted device %s of %s", device.device_id, user_id) async def on_reaction(self, room, event: ReactionEvent): """Handle reaction events for pipeline approval flow.""" if event.sender == BOT_USER: return if not self._sync_token_received: return event_id = event.reacts_to reaction_key = event.key logger.info("Reaction received: key=%s reacts_to=%s sender=%s", reaction_key, event_id, event.sender) # Check if this reaction is for a pipeline approval if not self.cron_scheduler: return from pipelines.approval import reaction_to_response response = reaction_to_response(reaction_key) if not response: logger.debug("Reaction key %s not an approval response", reaction_key) return logger.info("Approval reaction: %s -> %s (tracked events: %s, active futures: %s)", event_id, response, list(self._pipeline_approval_events.keys()), list(self.cron_scheduler.pipeline_engine._approval_futures.keys())) # Look up execution by approval event ID execution_id = self._pipeline_approval_events.get(event_id) if execution_id: resolved = self.cron_scheduler.pipeline_engine.resolve_approval(execution_id, response) if resolved: self._pipeline_approval_events.pop(event_id, None) logger.info("Pipeline approval resolved: %s -> %s", execution_id, response) return else: logger.warning("Failed to resolve approval %s (future not found or already done)", execution_id) # If not in local cache, check pending approvals from portal try: pending = await self.cron_scheduler._pipeline_state.fetch_pending_approvals() logger.info("Checking %d pending approvals from portal for event %s", len(pending), event_id) for execution in pending: if execution.get("approvalMsgId") == event_id: eid = execution["id"] self._pipeline_approval_events[event_id] = eid resolved = self.cron_scheduler.pipeline_engine.resolve_approval(eid, response) if resolved: self._pipeline_approval_events.pop(event_id, None) logger.info("Pipeline approval resolved (from portal): %s -> %s", eid, response) else: logger.warning("Portal found execution %s but future not active", eid) break except Exception: logger.debug("Failed to check pending approvals for reaction", exc_info=True) async def on_unknown(self, room, event: UnknownEvent): """Handle call member state events and in-room verification.""" # Route verification events if event.type.startswith("m.key.verification."): if event.sender != BOT_USER: await self._route_verification(room, event) return # Forward encryption key events to active voice sessions (skip our own) if event.type == ENCRYPTION_KEYS_TYPE: if event.sender == BOT_USER: return # ignore our own key events room_id = room.room_id content = event.source.get("content", {}) device_id = content.get("device_id", "") keys_list = content.get("keys", []) logger.info("Got encryption_keys timeline event from %s in %s (device=%s, keys=%d, content_keys=%s)", event.sender, room_id, device_id, len(keys_list), list(content.keys())) vs = self.voice_sessions.get(room_id) if vs: for k in keys_list: if "key" in k and "index" in k: key_b64 = k["key"] key_b64 += "=" * (-len(key_b64) % 4) key_bytes = base64.urlsafe_b64decode(key_b64) vs.on_encryption_key(event.sender, device_id, key_bytes, k["index"]) else: logger.warning("encryption_keys event missing key/index: %s", k) if not keys_list: logger.warning("encryption_keys event has empty keys list, full content: %s", content) else: logger.warning("No voice session for room %s to deliver encryption key", room_id) return if event.type != CALL_MEMBER_TYPE: return if event.sender == BOT_USER: return # ignore our own events # Non-empty content means someone started/is in a call if event.source.get("content", {}): room_id = room.room_id if room_id in self.active_calls: return logger.info("Call detected in %s from %s, joining...", room_id, event.sender) self.active_calls.add(room_id) self.active_callers.setdefault(room_id, set()).add(event.sender) # Get the foci_preferred from the caller's event content = event.source["content"] foci = content.get("foci_preferred", [{ "type": "livekit", "livekit_service_url": f"{HOMESERVER}/livekit-jwt-service", "livekit_alias": room_id, }]) # Compute LiveKit room name using same hash as lk-jwt-service # SHA256(room_id + "|" + "m.call#ROOM") encoded as unpadded base64 lk_room_hash = hashlib.sha256((room_id + "|m.call#ROOM").encode()).digest() lk_room_name = base64.b64encode(lk_room_hash).decode().rstrip("=") logger.info("LiveKit room name: %s (hashed from %s)", lk_room_name, room_id) # Send our own call member state event FIRST so Element Call # sends encryption_keys in response (before we start VoiceSession) call_content = { "application": "m.call", "call_id": "", "scope": "m.room", "device_id": BOT_DEVICE_ID, "expires": 7200000, "focus_active": { "type": "livekit", "focus_selection": "oldest_membership", }, "foci_preferred": foci, "m.call.intent": "audio", } state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call" try: resp = await self.client.room_put_state( room_id, CALL_MEMBER_TYPE, call_content, state_key=state_key, ) logger.info("Sent call member event in %s: %s", room_id, resp) except Exception: logger.exception("Failed to send call member event in %s", room_id) # Now create VoiceSession — encryption_keys may arrive via sync # while VoiceSession waits for key (up to 10s) if room_id not in self.voice_sessions: try: model = self.room_models.get(room_id, DEFAULT_MODEL) caller_device_id = content.get("device_id", "") # Generate bot's own E2EE key (16 bytes like Element Call) import secrets bot_key = secrets.token_bytes(16) # Collect all recent document contexts (< 1 hour) doc_entries = [e for e in self._room_document_context.get(room_id, []) if time.time() - e["timestamp"] < 3600] document_context = None if doc_entries: parts = [] for e in doc_entries: label = {"pdf": "PDF", "image": "Bild", "text": "Datei"}.get(e["type"], "Dokument") text = e["text"][:40000] if e["type"] != "image" else e["text"][:2000] parts.append(f"[{label}: {e['filename']}]\n{text}") document_context = "\n\n".join(parts) logger.info("Passing %d document context(s) to voice session (%d chars total)", len(doc_entries), len(document_context)) vs = VoiceSession( nio_client=self.client, room_id=room_id, device_id=BOT_DEVICE_ID, lk_url=LK_URL, model=model, bot_key=bot_key, publish_key_cb=lambda key, rid=room_id: asyncio.ensure_future( self._publish_encryption_key(rid, key)), memory=self.memory, caller_user_id=event.sender, document_context=document_context, ) # Check timeline for caller's key caller_key = await self._get_call_encryption_key(room_id, event.sender, caller_device_id) if caller_key: vs.on_encryption_key(event.sender, caller_device_id, caller_key, 0) # Store BEFORE start so on_unknown handler can forward keys via sync self.voice_sessions[room_id] = vs await vs.start() logger.info("Voice session started for room %s (e2ee_key=%s)", room_id, "yes" if caller_key else "waiting for sync") except Exception: logger.exception("Voice session start failed for %s", room_id) self.voice_sessions.pop(room_id, None) else: # Empty content = someone left the call room_id = room.room_id if room_id in self.active_calls: # Remove this caller from active set callers = self.active_callers.get(room_id, set()) callers.discard(event.sender) if callers: logger.info("Participant %s left %s but %d other(s) still in call — keeping session", event.sender, room_id, len(callers)) return # No callers left — stop voice session logger.info("Last caller %s left %s — stopping session", event.sender, room_id) self.active_callers.pop(room_id, None) vs = self.voice_sessions.pop(room_id, None) if vs: transcript = vs.get_transcript() doc_context = vs.get_document_context() confluence_page_id = vs.get_confluence_page_id() try: await vs.stop() logger.info("Voice session stopped for %s", room_id) except Exception: logger.exception("Failed to stop voice session for %s", room_id) # Post call summary to room if transcript: try: summary = await self._summarize_call(transcript, room_id) await self._send_text(room_id, f"**Anruf-Zusammenfassung:**\n\n{summary}") except Exception: logger.exception("Failed to post call summary for %s", room_id) # Persist voice transcript as conversation chunks in memory service try: caller = event.sender for entry in transcript: if entry["role"] == "user": user_text = entry["text"] # Find the next assistant response idx = transcript.index(entry) ai_text = "" if idx + 1 < len(transcript) and transcript[idx + 1]["role"] == "assistant": ai_text = transcript[idx + 1]["text"] if user_text and ai_text: await self._store_conversation_chunk( user_text, ai_text, caller, room_id ) logger.info("Stored %d voice transcript chunks for %s", len(transcript) // 2, room_id) except Exception: logger.warning("Failed to store voice transcript chunks for %s", room_id, exc_info=True) # Extract and post document annotations if a document was discussed if doc_context: try: annotations = await self._extract_document_annotations( transcript, doc_context, room_id) if annotations: await self._send_text(room_id, f"**Dokument-Aenderungen:**\n\n{annotations}") except Exception: logger.exception("Failed to post document annotations for %s", room_id) # Leave the call too self.active_calls.discard(room_id) state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call" try: await self.client.room_put_state( room_id, CALL_MEMBER_TYPE, {}, state_key=state_key, ) logger.info("Left call in %s", room_id) except Exception: logger.exception("Failed to leave call in %s", room_id) async def _load_room_settings(self, room_id: str): """Load persisted model and auto-rename settings from room state.""" if room_id in self._loaded_rooms: return self._loaded_rooms.add(room_id) for state_type, target in [ (MODEL_STATE_TYPE, "model"), (RENAME_STATE_TYPE, "rename"), ]: try: resp = await self.client.room_get_state_event(room_id, state_type, "") if hasattr(resp, "content"): content = resp.content if target == "model" and "model" in content: self.room_models[room_id] = content["model"] elif target == "rename" and content.get("enabled"): self.auto_rename_rooms.add(room_id) except Exception: pass # State event doesn't exist yet # --- User memory helpers --- @staticmethod def _format_memories(memories: list[dict]) -> str: """Format memory query results as a system prompt section.""" if not memories: return "" facts = [m["fact"] for m in memories] return "You have these memories about this user:\n" + "\n".join(f"- {f}" for f in facts) @staticmethod def _format_chunks(chunks: list[dict]) -> str: """Format conversation chunk results as a system prompt section.""" if not chunks: return "" parts = ["Relevant past conversations:"] for c in chunks: ts = c.get("original_ts", 0) date_str = time.strftime("%Y-%m-%d", time.gmtime(ts)) if ts else "unknown" summary = c.get("summary", "") text = c.get("chunk_text", "") # Truncate chunk text to ~500 chars for context window efficiency if len(text) > 500: text = text[:500] + "..." parts.append(f"\n### {summary} ({date_str})\n{text}") return "\n".join(parts) async def _store_conversation_chunk(self, user_message: str, ai_reply: str, sender: str, room_id: str): """Store a user-assistant exchange as a conversation chunk for RAG.""" if not self.llm or not self.memory.enabled: return chunk_text = f"User: {user_message}\nAssistant: {ai_reply}" try: resp = await self.llm.chat.completions.create( model="gemini-flash", messages=[ {"role": "system", "content": ( "Summarize this conversation exchange in 1-2 sentences for search indexing. " "Focus on the topic and key information discussed. Be concise. " "Write the summary in the same language as the conversation." )}, {"role": "user", "content": chunk_text[:2000]}, ], max_tokens=100, ) summary = resp.choices[0].message.content.strip() except Exception: logger.warning("Chunk summarization failed, using truncated message", exc_info=True) summary = user_message[:200] await self.memory.store_chunk( user_id=sender, room_id=room_id, chunk_text=chunk_text, summary=summary, original_ts=time.time(), ) # Regex for detecting personal facts worth extracting (pronouns, possessives, identity markers) _PERSONAL_FACT_RE = re.compile( r"\b(ich|mein|meine|meinem|i'm|i am|my |mine|we |our |" r"name is|hei[sß]e|wohne|arbeite|lebe|studier|born|live|work|" r"prefer|favorite|hobby|birthday|family|wife|husband|partner|child|dog|cat)\b", re.IGNORECASE, ) def _is_trivial_message(self, text: str) -> bool: """Return True if the message is too trivial for memory extraction.""" if len(text) >= 20: return False if self._PERSONAL_FACT_RE.search(text): return False return True async def _extract_and_store_memories(self, user_message: str, ai_reply: str, existing_facts: list[str], model: str, sender: str, room_id: str): """Use LLM to extract memorable facts, then store each via memory-service.""" if not self.llm: return # Skip extraction for trivial messages (saves ~2-3s + 1 LLM call) if self._is_trivial_message(user_message): logger.debug("Skipping memory extraction for trivial message: %s", user_message[:40]) return existing_text = "\n".join(f"- {f}" for f in existing_facts) if existing_facts else "(none)" logger.info("Memory extraction: user_msg=%s... (%d existing facts)", user_message[:80], len(existing_facts)) try: resp = await self.llm.chat.completions.create( model=model, messages=[ {"role": "system", "content": ( "You extract memorable facts about the USER from conversations. " "Return a JSON array of strings — each string is a concise fact worth remembering. " "Include: name, language preference, location, occupation, interests, preferences, " "family, pets, projects, important dates, or any personal detail shared. " "Do NOT include: the current question/topic, temporary info, or things the AI said. " "Do NOT extract facts from documents or search results mentioned in the AI reply — " "those are about OTHER people/entities, not the user. Only extract facts the user " "directly states about themselves. " "Do NOT duplicate existing memories (rephrase or skip if already known). " "Return [] if nothing new is worth remembering." )}, {"role": "user", "content": ( f"Existing memories:\n{existing_text}\n\n" f"User message: {user_message[:500]}\n" # Only include first 200 chars of AI reply to avoid document content pollution f"AI reply (summary only): {ai_reply[:200]}\n\n" "New facts to remember (JSON array of strings):" )}, ], max_tokens=300, ) raw = resp.choices[0].message.content.strip() logger.info("Memory extraction raw response: %s", raw[:200]) if raw.startswith("```"): raw = re.sub(r"^```\w*\n?", "", raw) raw = re.sub(r"\n?```$", "", raw) match = re.search(r"\[.*\]", raw, re.DOTALL) if match: raw = match.group(0) new_facts = json.loads(raw) if not isinstance(new_facts, list): logger.warning("Memory extraction returned non-list: %s", type(new_facts)) return logger.info("Memory extraction found %d new facts", len(new_facts)) for fact in new_facts: if isinstance(fact, str) and fact.strip(): await self.memory.store(sender, fact.strip(), room_id) except json.JSONDecodeError: logger.warning("Memory extraction JSON parse failed, raw: %s", raw[:200]) except Exception: logger.warning("Memory extraction failed", exc_info=True) async def on_text_message(self, room, event: RoomMessageText): """Handle text messages: commands and AI responses.""" if event.sender == BOT_USER: return if not self._sync_token_received: return # ignore messages from initial sync / backfill # Ignore old messages (>30s) to avoid replaying history server_ts = event.server_timestamp / 1000 if time.time() - server_ts > 30: return await self._load_room_settings(room.room_id) body = event.body.strip() # In DMs (2 members), respond to all messages; in groups, require @mention is_dm = room.member_count == 2 if not is_dm: bot_display = self.client.user_id.split(":")[0].lstrip("@") mentioned = ( BOT_USER in body or f"@{bot_display}" in body.lower() or bot_display.lower() in body.lower() ) if not mentioned: return if not self.llm: await self._send_text(room.room_id, "LLM not configured (LITELLM_BASE_URL not set).") return sender = event.sender # Check if a recent image was sent in this room (within 60s) image_data = None cached = self._recent_images.get(room.room_id) if cached: b64, mime, ts = cached if time.time() - ts < 60: image_data = (b64, mime) del self._recent_images[room.room_id] # If no cached image but user asks about screen/camera, try capturing from active call if not image_data and re.search( r'siehst du|bildschirm|screen|was siehst|kannst du sehen|schau mal|look at|can you see|zeig', body, re.IGNORECASE ): vs = self.voice_sessions.get(room.room_id) if vs and vs._video_track: try: stream = rtc.VideoStream(vs._video_track) frame = None async for f in stream: frame = f break try: await stream.aclose() except Exception: pass if frame: from PIL import Image vf = getattr(frame, 'frame', frame) rgba = vf.convert(rtc.VideoBufferType.RGBA) if rgba.width >= 64 and rgba.height >= 64: img = Image.frombytes("RGBA", (rgba.width, rgba.height), bytes(rgba.data)) buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=85) img_b64 = base64.b64encode(buf.getvalue()).decode() image_data = (img_b64, "image/jpeg") logger.info("Captured %dx%d frame from active call for text query", rgba.width, rgba.height) else: logger.warning("Frame too small (%dx%d) — E2EE video decryption likely failed", rgba.width, rgba.height) # Retry once after 2s — E2EE key may still be propagating await asyncio.sleep(2) try: stream2 = rtc.VideoStream(vs._video_track) frame2 = None async for f2 in stream2: frame2 = f2 break try: await stream2.aclose() except Exception: pass if frame2: vf2 = getattr(frame2, 'frame', frame2) rgba2 = vf2.convert(rtc.VideoBufferType.RGBA) if rgba2.width >= 64 and rgba2.height >= 64: img2 = Image.frombytes("RGBA", (rgba2.width, rgba2.height), bytes(rgba2.data)) buf2 = io.BytesIO() img2.convert("RGB").save(buf2, format="JPEG", quality=85) img_b64 = base64.b64encode(buf2.getvalue()).decode() image_data = (img_b64, "image/jpeg") logger.info("Retry captured %dx%d frame from call", rgba2.width, rgba2.height) else: logger.warning("Retry still too small (%dx%d)", rgba2.width, rgba2.height) except Exception as exc2: logger.warning("Frame retry failed: %s", exc2) except Exception as exc: logger.warning("Failed to capture frame from call: %s", exc) # Detect Confluence page links → store page ID for voice session context confluence_page_id = None conf_long = re.search(r'agiliton\.atlassian\.net/wiki/.*?pages/(\d+)', body) conf_short = re.search(r'agiliton\.atlassian\.net/wiki/x/([A-Za-z0-9_-]+)', body) if conf_long: confluence_page_id = conf_long.group(1) elif conf_short and CONFLUENCE_URL and CONFLUENCE_USER and CONFLUENCE_TOKEN: # Resolve short link via Confluence API tiny_id = conf_short.group(1) try: async with httpx.AsyncClient(timeout=10.0) as hc: resp = await hc.get( f"{CONFLUENCE_URL}/rest/api/content", params={"type": "page", "expand": "version", "limit": "1", "start": "0"}, auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN), follow_redirects=True, ) # Try the tiny link redirect approach resp2 = await hc.get( f"{CONFLUENCE_URL}/x/{tiny_id}", auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN), follow_redirects=True, ) # Extract page ID from the redirected URL redir_match = re.search(r'pages/(\d+)', str(resp2.url)) if redir_match: confluence_page_id = redir_match.group(1) else: # Parse page ID from HTML response id_match = re.search(r'"pageId"\s*:\s*"?(\d+)', resp2.text) if id_match: confluence_page_id = id_match.group(1) logger.info("Resolved Confluence short link /x/%s → page %s", tiny_id, confluence_page_id) except Exception as exc: logger.warning("Confluence short link resolution failed: %s", exc) if confluence_page_id: # Fetch page with section structure for targeted editing conf_text = f"confluence_page_id:{confluence_page_id}" conf_title = f"Confluence page {confluence_page_id}" if CONFLUENCE_URL and CONFLUENCE_USER and CONFLUENCE_TOKEN: try: from confluence_collab.client import Auth, get_page from confluence_collab.parser import parse_sections auth = Auth(base_url=CONFLUENCE_URL, username=CONFLUENCE_USER, api_token=CONFLUENCE_TOKEN) page = await get_page(confluence_page_id, auth) conf_title = page.title sections = parse_sections(page.body_html) section_outline = "\n".join( f"{' ' * (s.level - 1)}h{s.level}: {s.heading}" for s in sections ) # Strip HTML for plain text context plain = re.sub(r"<[^>]+>", " ", page.body_html) plain = re.sub(r"\s+", " ", plain).strip() conf_text = ( f"confluence_page_id:{confluence_page_id}\n\n" f"Title: {page.title}\n\n" f"Sections:\n{section_outline}\n\n" f"{plain}" ) logger.info("Fetched Confluence page %s: %s (%d chars, %d sections)", confluence_page_id, page.title, len(plain), len(sections)) except Exception as exc: logger.warning("Confluence page fetch failed for %s: %s", confluence_page_id, exc) # Fallback to voice.py reader try: from voice import _confluence_read_page title, plain, _ver = await _confluence_read_page(confluence_page_id) conf_title = title conf_text = f"confluence_page_id:{confluence_page_id}\n\nTitle: {title}\n\n{plain}" except Exception: pass docs = self._room_document_context.setdefault(room.room_id, []) docs.append({ "type": "confluence", "filename": conf_title, "text": conf_text, "timestamp": time.time(), }) logger.info("Confluence page %s detected in room %s", confluence_page_id, room.room_id) # Check article summary FSM (Blinkist-style audio summaries) if self.article_handler: summary_response = await self.article_handler.handle_message( room.room_id, sender, body ) if summary_response is not None: if summary_response == "__GENERATE__": await self.client.room_typing(room.room_id, typing_state=True) try: await self.article_handler.generate_and_post( self, room.room_id, sender ) finally: await self.client.room_typing(room.room_id, typing_state=False) return elif summary_response.startswith("__DISCUSS__"): # Extract article context, enrich the user message for AI article_info = summary_response[len("__DISCUSS__"):] body = f"[Article context: {article_info[:6000]}]\n\nUser message: {body}" # Fall through to normal AI handler with enriched context elif summary_response: await self._send_text(room.room_id, summary_response) return # Inject typed message into active voice session transcript for context sharing vs = self.voice_sessions.get(room.room_id) if vs: vs._transcript.append({"role": "user", "text": f"[typed in chat] {body}"}) await self.client.room_typing(room.room_id, typing_state=True) try: reply = await self._respond_with_ai(room, body, sender=sender, image_data=image_data) # Also inject bot's text reply into voice transcript if reply and vs: vs._transcript.append({"role": "assistant", "text": f"[replied in chat] {reply}"}) finally: await self.client.room_typing(room.room_id, typing_state=False) async def on_image_message(self, room, event: RoomMessageImage): """Handle image messages: download, encode, and send to AI for analysis.""" if event.sender == BOT_USER: return if not self._sync_token_received: return server_ts = event.server_timestamp / 1000 if time.time() - server_ts > 30: return # Check for pipeline file_upload triggers (before DM/mention check) source = event.source or {} content = source.get("content", {}) info = content.get("info", {}) img_mime = info.get("mimetype", "image/png") img_filename = content.get("body", "image.png") if self.cron_scheduler: await self._check_pipeline_file_trigger(room, event, img_filename, img_mime) await self._load_room_settings(room.room_id) # In DMs respond to all images; in groups only if bot was recently @mentioned is_dm = room.member_count == 2 if not is_dm: # Check if bot was @mentioned in the image body (caption) or skip body = (event.body or "").strip() bot_display = self.client.user_id.split(":")[0].lstrip("@") mentioned = ( BOT_USER in body or f"@{bot_display}" in body.lower() or bot_display.lower() in body.lower() ) if not mentioned: return if not self.llm: await self._send_text(room.room_id, "LLM not configured (LITELLM_BASE_URL not set).") return # Download image from Matrix homeserver mxc_url = event.url if not mxc_url: return try: resp = await self.client.download(mxc=mxc_url) if not hasattr(resp, "body"): logger.warning("Image download failed for %s", mxc_url) return img_bytes = resp.body except Exception: logger.exception("Failed to download image %s", mxc_url) return # Determine MIME type mime_type = getattr(event, "mimetype", None) or "image/png" b64_data = base64.b64encode(img_bytes).decode("utf-8") caption = (event.body or "").strip() # Treat filenames (contain dots or are very long) as no caption is_filename = not caption or caption == "image" or "." in caption or len(caption) > 100 text = "What's in this image?" if is_filename else caption # Cache image for follow-up text messages self._recent_images[room.room_id] = (b64_data, mime_type, time.time()) await self.client.room_typing(room.room_id, typing_state=True) try: reply = await self._respond_with_ai(room, text, sender=event.sender, image_data=(b64_data, mime_type)) if reply: docs = self._room_document_context.setdefault(room.room_id, []) docs.append({"type": "image", "filename": caption or "image", "text": reply, "timestamp": time.time()}) if len(docs) > 5: del docs[:-5] finally: await self.client.room_typing(room.room_id, typing_state=False) async def on_encrypted_image_message(self, room, event: RoomEncryptedImage): """Handle encrypted image messages: decrypt, encode, and send to AI.""" if event.sender == BOT_USER: return if not self._sync_token_received: return server_ts = event.server_timestamp / 1000 if time.time() - server_ts > 30: return await self._load_room_settings(room.room_id) is_dm = room.member_count == 2 if not is_dm: body = (event.body or "").strip() bot_display = self.client.user_id.split(":")[0].lstrip("@") mentioned = ( BOT_USER in body or f"@{bot_display}" in body.lower() or bot_display.lower() in body.lower() ) if not mentioned: return if not self.llm: await self._send_text(room.room_id, "LLM not configured (LITELLM_BASE_URL not set).") return mxc_url = event.url if not mxc_url: return try: resp = await self.client.download(mxc=mxc_url) if not hasattr(resp, "body"): logger.warning("Encrypted image download failed for %s", mxc_url) return # Decrypt the attachment img_bytes = decrypt_attachment(resp.body, event.key["k"], event.hashes["sha256"], event.iv) except Exception: logger.exception("Failed to download/decrypt encrypted image %s", mxc_url) return mime_type = getattr(event, "mimetype", None) or "image/png" b64_data = base64.b64encode(img_bytes).decode("utf-8") caption = (event.body or "").strip() is_filename = not caption or caption == "image" or "." in caption or len(caption) > 100 text = "What's in this image?" if is_filename else caption # Cache image for follow-up text messages self._recent_images[room.room_id] = (b64_data, mime_type, time.time()) await self.client.room_typing(room.room_id, typing_state=True) try: reply = await self._respond_with_ai(room, text, sender=event.sender, image_data=(b64_data, mime_type)) if reply: docs = self._room_document_context.setdefault(room.room_id, []) docs.append({"type": "image", "filename": caption or "image", "text": reply, "timestamp": time.time()}) if len(docs) > 5: del docs[:-5] finally: await self.client.room_typing(room.room_id, typing_state=False) # Supported text-based file extensions _TEXT_EXTENSIONS = frozenset({ ".txt", ".md", ".csv", ".json", ".xml", ".html", ".yaml", ".yml", ".log", }) def _on_pipeline_approval_registered(self, event_id: str, execution_id: str): """Track approval event -> execution mapping for reaction handling.""" self._pipeline_approval_events[event_id] = execution_id async def _check_pipeline_file_trigger(self, room, event, filename: str, mime_type: str): """Check if an uploaded file matches any pipeline file_upload trigger.""" pipelines = self.cron_scheduler.get_file_upload_pipelines() for pipeline in pipelines: target_room = pipeline.get("targetRoom", "") # Pipeline must target this room if target_room != room.room_id: continue # Check mime type match required_mime = pipeline.get("fileMimetype", "") if required_mime and not mime_type.startswith(required_mime): continue # Check filename pattern pattern = pipeline.get("filePattern", "") if pattern: try: if not re.match(pattern, filename): continue except re.error: continue # Download file content for trigger data mxc_url = event.url if hasattr(event, "url") else None file_text = "" image_b64 = "" if mxc_url: try: resp = await self.client.download(mxc=mxc_url) if hasattr(resp, "body"): file_bytes = resp.body ext = os.path.splitext(filename.lower())[1] if mime_type.startswith("image/"): # Encode image as base64 for vision analysis in claude_prompt steps image_b64 = base64.b64encode(file_bytes).decode("utf-8") elif ext == ".pdf": file_text = self._extract_pdf_text(file_bytes) elif ext == ".docx": file_text = self._extract_docx_text(file_bytes) else: file_text = self._extract_text_file(file_bytes) if file_text and len(file_text) > 50000: file_text = file_text[:50000] except Exception: logger.debug("Failed to extract file for pipeline trigger", exc_info=True) trigger_data = { "filename": filename, "mime_type": mime_type, "file_content": file_text, "image_b64": image_b64, "sender": event.sender, "room_id": room.room_id, } logger.info("File upload triggered pipeline: %s (file: %s)", pipeline["name"], filename) asyncio.create_task(self.cron_scheduler.pipeline_engine.run(pipeline, trigger_data)) async def on_file_message(self, room, event: RoomMessageFile): """Handle file messages: extract text from PDFs, docx, and text files.""" if event.sender == BOT_USER: return if not self._sync_token_received: return server_ts = event.server_timestamp / 1000 if time.time() - server_ts > 30: return source = event.source or {} content = source.get("content", {}) info = content.get("info", {}) mime_type = info.get("mimetype", "") filename = content.get("body", "file") ext = os.path.splitext(filename.lower())[1] # Determine file type is_pdf = mime_type == "application/pdf" or ext == ".pdf" is_docx = mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or ext == ".docx" is_text = ext in self._TEXT_EXTENSIONS or mime_type.startswith("text/") # Check for pipeline file_upload triggers (before DM/mention check) if self.cron_scheduler: await self._check_pipeline_file_trigger(room, event, filename, mime_type) if not (is_pdf or is_docx or is_text): return await self._load_room_settings(room.room_id) # In DMs respond to all files; in groups only if bot was recently @mentioned is_dm = room.member_count == 2 if not is_dm: body = (event.body or "").strip() bot_display = self.client.user_id.split(":")[0].lstrip("@") mentioned = ( BOT_USER in body or f"@{bot_display}" in body.lower() or bot_display.lower() in body.lower() ) if not mentioned: return if not self.llm: await self._send_text(room.room_id, "LLM not configured (LITELLM_BASE_URL not set).") return # Download file mxc_url = event.url if not mxc_url: return try: resp = await self.client.download(mxc=mxc_url) if not hasattr(resp, "body"): logger.warning("File download failed for %s", mxc_url) return file_bytes = resp.body except Exception: logger.exception("Failed to download file %s", mxc_url) return # Extract text based on file type if is_pdf: extracted = self._extract_pdf_text(file_bytes) doc_type = "pdf" elif is_docx: extracted = self._extract_docx_text(file_bytes) doc_type = "text" else: extracted = self._extract_text_file(file_bytes) doc_type = "text" # Scanned PDF fallback: render pages as images for vision analysis if not extracted and is_pdf: page_images = self._render_pdf_pages_as_images(file_bytes) if page_images: await self.client.room_typing(room.room_id, typing_state=True) try: user_message = f'The user sent a scanned PDF named "{filename}" ({len(page_images)} page(s)). Analyze the document content and summarize it.' reply = await self._respond_with_ai(room, user_message, sender=event.sender, image_data=page_images[0]) if reply: docs = self._room_document_context.setdefault(room.room_id, []) docs.append({"type": "pdf", "filename": filename, "text": reply, "timestamp": time.time()}) if len(docs) > 5: del docs[:-5] finally: await self.client.room_typing(room.room_id, typing_state=False) return if not extracted: await self._send_text(room.room_id, f"I couldn't extract any text from that file ({filename}).") return # Truncate to avoid token limits (roughly 50k chars ≈ 12k tokens) if len(extracted) > 50000: extracted = extracted[:50000] + "\n\n[... truncated, file too long ...]" # Store document context for voice session pickup docs = self._room_document_context.setdefault(room.room_id, []) docs.append({ "type": doc_type, "filename": filename, "text": extracted, "timestamp": time.time(), }) if len(docs) > 5: del docs[:-5] label = "PDF" if is_pdf else "Word document" if is_docx else "file" user_message = f'The user sent a {label} named "{filename}". Here is the extracted text:\n\n{extracted}\n\nPlease summarize or answer questions about this document.' await self.client.room_typing(room.room_id, typing_state=True) try: await self._respond_with_ai(room, user_message, sender=event.sender) finally: await self.client.room_typing(room.room_id, typing_state=False) async def on_encrypted_file_message(self, room, event: RoomEncryptedFile): """Handle encrypted file messages: decrypt and process like on_file_message.""" if event.sender == BOT_USER: return if not self._sync_token_received: return server_ts = event.server_timestamp / 1000 if time.time() - server_ts > 30: return source = event.source or {} content = source.get("content", {}) filename = content.get("body", "file") ext = os.path.splitext(filename.lower())[1] is_pdf = ext == ".pdf" is_docx = ext == ".docx" is_text = ext in self._TEXT_EXTENSIONS if not (is_pdf or is_docx or is_text): return await self._load_room_settings(room.room_id) is_dm = room.member_count == 2 if not is_dm: body = (event.body or "").strip() bot_display = self.client.user_id.split(":")[0].lstrip("@") mentioned = ( BOT_USER in body or f"@{bot_display}" in body.lower() or bot_display.lower() in body.lower() ) if not mentioned: return if not self.llm: await self._send_text(room.room_id, "LLM not configured (LITELLM_BASE_URL not set).") return mxc_url = event.url if not mxc_url: return try: resp = await self.client.download(mxc=mxc_url) if not hasattr(resp, "body"): logger.warning("Encrypted file download failed for %s", mxc_url) return file_bytes = decrypt_attachment(resp.body, event.key["k"], event.hashes["sha256"], event.iv) except Exception: logger.exception("Failed to download/decrypt encrypted file %s", mxc_url) return if is_pdf: extracted = self._extract_pdf_text(file_bytes) doc_type = "pdf" elif is_docx: extracted = self._extract_docx_text(file_bytes) doc_type = "text" else: extracted = self._extract_text_file(file_bytes) doc_type = "text" # Scanned PDF fallback: render pages as images for vision analysis if not extracted and is_pdf: page_images = self._render_pdf_pages_as_images(file_bytes) if page_images: await self.client.room_typing(room.room_id, typing_state=True) try: user_message = f'The user sent a scanned PDF named "{filename}" ({len(page_images)} page(s)). Analyze the document content and summarize it.' reply = await self._respond_with_ai(room, user_message, sender=event.sender, image_data=page_images[0]) if reply: docs = self._room_document_context.setdefault(room.room_id, []) docs.append({"type": "pdf", "filename": filename, "text": reply, "timestamp": time.time()}) if len(docs) > 5: del docs[:-5] finally: await self.client.room_typing(room.room_id, typing_state=False) return if not extracted: await self._send_text(room.room_id, f"I couldn't extract any text from that file ({filename}).") return if len(extracted) > 50000: extracted = extracted[:50000] + "\n\n[... truncated, file too long ...]" docs = self._room_document_context.setdefault(room.room_id, []) docs.append({ "type": doc_type, "filename": filename, "text": extracted, "timestamp": time.time(), }) if len(docs) > 5: del docs[:-5] label = "PDF" if is_pdf else "Word document" if is_docx else "file" user_message = f'The user sent a {label} named "{filename}". Here is the extracted text:\n\n{extracted}\n\nPlease summarize or answer questions about this document.' await self.client.room_typing(room.room_id, typing_state=True) try: await self._respond_with_ai(room, user_message, sender=event.sender) finally: await self.client.room_typing(room.room_id, typing_state=False) @staticmethod def _extract_pdf_text(pdf_bytes: bytes) -> str: """Extract text from PDF bytes using pymupdf.""" try: doc = fitz.open(stream=pdf_bytes, filetype="pdf") pages = [] for i, page in enumerate(doc): text = page.get_text().strip() if text: pages.append(f"--- Page {i + 1} ---\n{text}") doc.close() return "\n\n".join(pages) except Exception: logger.exception("PDF text extraction failed") return "" @staticmethod def _render_pdf_pages_as_images(pdf_bytes: bytes, max_pages: int = 5) -> list[tuple[str, str]]: """Render PDF pages to PNG images for vision fallback (scanned PDFs). Returns list of (base64_data, mime_type) tuples, one per page. """ try: doc = fitz.open(stream=pdf_bytes, filetype="pdf") images = [] for i, page in enumerate(doc): if i >= max_pages: break pix = page.get_pixmap(dpi=200) png_bytes = pix.tobytes("png") b64 = base64.b64encode(png_bytes).decode("utf-8") images.append((b64, "image/png")) doc.close() return images except Exception: logger.exception("PDF page rendering failed") return [] @staticmethod def _extract_docx_text(docx_bytes: bytes) -> str: """Extract text from .docx bytes using python-docx.""" try: doc = docx.Document(io.BytesIO(docx_bytes)) return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) except Exception: logger.exception("DOCX text extraction failed") return "" @staticmethod def _extract_text_file(file_bytes: bytes) -> str: """Decode text file bytes as UTF-8 with fallback to latin-1.""" try: return file_bytes.decode("utf-8") except UnicodeDecodeError: try: return file_bytes.decode("latin-1") except Exception: logger.exception("Text file decode failed") return "" async def _brave_search(self, query: str, count: int = 5) -> str: """Call Brave Search API and return formatted results.""" if not BRAVE_API_KEY: return "Web search unavailable (no API key configured)." try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( "https://api.search.brave.com/res/v1/web/search", headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY}, params={"q": query, "count": count, "text_decorations": False}, ) resp.raise_for_status() data = resp.json() results = data.get("web", {}).get("results", []) if not results: return "No results found." lines = [] for r in results[:count]: title = r.get("title", "") desc = r.get("description", "") url = r.get("url", "") lines.append(f"- [{title}]({url}): {desc}") return "\n".join(lines) except Exception as exc: logger.warning("Brave search error: %s", exc) return f"Search failed: {exc}" async def _fetch_webpage(self, url: str, max_chars: int = 8000) -> str: """Fetch a URL and extract clean text content using BeautifulSoup.""" try: from bs4 import BeautifulSoup async with httpx.AsyncClient(timeout=15.0, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 (compatible; AgilitonBot/1.0)"}) as client: resp = await client.get(url) resp.raise_for_status() ct = resp.headers.get("content-type", "") if "html" not in ct and "text" not in ct: return f"URL returned non-text content ({ct})." soup = BeautifulSoup(resp.text, "lxml") for tag in soup(["script", "style", "nav", "footer", "header", "aside", "iframe"]): tag.decompose() main = soup.find("article") or soup.find("main") or soup.find("body") text = main.get_text(separator="\n", strip=True) if main else soup.get_text(separator="\n", strip=True) text = re.sub(r'\n{3,}', '\n\n', text) if len(text) > max_chars: text = text[:max_chars] + "\n\n[... truncated]" return text if text.strip() else "Page loaded but no readable text content found." except httpx.HTTPStatusError as exc: return f"HTTP error {exc.response.status_code} fetching {url}" except Exception as exc: logger.warning("Webpage fetch error for %s: %s", url, exc) return f"Failed to fetch page: {exc}" async def _execute_tool(self, tool_name: str, args: dict, sender: str, room_id: str) -> str: """Execute a tool call and return the result as a string.""" # Image generation — no Atlassian token needed if tool_name == "generate_image": await self._generate_and_send_image(room_id, args.get("prompt", "")) return "Image generated and sent to the room." # Web search — no auth needed if tool_name == "web_search": return await self._brave_search(args.get("query", ""), args.get("count", 5)) # Browse URL — no auth needed, with DNS fallback via web search if tool_name == "browse_url": url = args.get("url", "") result = await self._fetch_webpage(url) if "Name or service not known" in result or "Failed to fetch" in result: from urllib.parse import urlparse domain = urlparse(url).netloc or url logger.info("Browse DNS fail for %s — trying web search", domain) search_result = await self._brave_search(f"{domain} website", count=3) if search_result and "No results" not in search_result: url_match = re.search(r'https?://[^\s\)]+', search_result) if url_match: corrected_url = url_match.group(0).rstrip('.,;') retry = await self._fetch_webpage(corrected_url) if "Failed to fetch" not in retry: return f"[Original URL {url} unreachable. Found correct site at {corrected_url}]\n\n{retry}" return f"Could not reach {url}. The domain may be misspelled — ask the user to clarify." return result # Room history search — no auth needed if tool_name == "search_room_history": return await self._search_room_history( room_id, args.get("query", ""), args.get("limit", 200) ) # Scheduler tools — no Atlassian auth needed if tool_name == "schedule_message": return await self._schedule_message( sender, room_id, args.get("message", ""), args.get("datetime_iso", ""), args.get("repeat", "once") ) if tool_name == "list_reminders": return await self._list_reminders(sender) if tool_name == "cancel_reminder": return await self._cancel_reminder(sender, args.get("reminder_id", 0)) # Atlassian tools — need per-user token token = await self.atlassian.get_token(sender) if sender else None if not token: return ATLASSIAN_NOT_CONNECTED_MSG if tool_name == "confluence_recent_pages": return await self.atlassian.confluence_recent_pages(token, args.get("limit", 5)) elif tool_name == "confluence_search": return await self.atlassian.confluence_search(token, args["query"], args.get("limit", 5)) elif tool_name == "confluence_read_page": return await self.atlassian.confluence_read_page(token, args["page_id"]) elif tool_name == "confluence_update_page": return await self.atlassian.confluence_update_page( token, args["page_id"], args["section_heading"], args["new_content"]) elif tool_name == "confluence_create_page": return await self.atlassian.confluence_create_page( token, args["title"], args["content"], args.get("space_key", "AG")) elif tool_name == "jira_search": return await self.atlassian.jira_search(token, args["jql"], args.get("limit", 10)) elif tool_name == "jira_get_issue": return await self.atlassian.jira_get_issue(token, args["issue_key"]) elif tool_name == "jira_create_issue": return await self.atlassian.jira_create_issue( token, args["project"], args["summary"], args.get("issue_type", "Task"), args.get("description", ""), ) elif tool_name == "jira_add_comment": return await self.atlassian.jira_add_comment(token, args["issue_key"], args["comment"]) elif tool_name == "jira_transition": return await self.atlassian.jira_transition(token, args["issue_key"], args["status"]) else: return f"Unknown tool: {tool_name}" async def _search_room_history(self, room_id: str, query: str, limit: int = 200) -> str: """Search room message history for messages matching a query string.""" limit = min(limit, 500) query_lower = query.lower() try: # Paginate through room history matches = [] token = self.client.next_batch or "" fetched = 0 while fetched < limit: batch_size = min(100, limit - fetched) resp = await self.client.room_messages( room_id, start=token, limit=batch_size ) if not hasattr(resp, "chunk") or not resp.chunk: break for evt in resp.chunk: if not hasattr(evt, "body"): continue if query_lower in evt.body.lower(): ts = evt.server_timestamp / 1000 date_str = time.strftime("%Y-%m-%d %H:%M", time.gmtime(ts)) sender_name = evt.sender.split(":")[0].lstrip("@") matches.append(f"[{date_str}] {sender_name}: {evt.body[:500]}") fetched += len(resp.chunk) token = resp.end if not token: break if not matches: return f"No messages found matching '{query}' in the last {fetched} messages." # Return newest first (matches are in reverse chronological from pagination) result = f"Found {len(matches)} message(s) matching '{query}' (scanned {fetched} messages):\n\n" result += "\n\n---\n\n".join(matches[:20]) # cap at 20 results if len(matches) > 20: result += f"\n\n... and {len(matches) - 20} more matches." return result except Exception: logger.warning("Room history search failed", exc_info=True) return "Failed to search room history." async def _schedule_message(self, sender: str, room_id: str, message: str, datetime_iso: str, repeat: str) -> str: """Parse datetime, validate, store via memory service.""" if not message: return "No reminder message provided." if not datetime_iso: return "No datetime provided." try: from datetime import datetime as dt, timezone parsed = dt.fromisoformat(datetime_iso) if parsed.tzinfo is None: # Default to Europe/Berlin import zoneinfo parsed = parsed.replace(tzinfo=zoneinfo.ZoneInfo("Europe/Berlin")) ts = parsed.timestamp() except Exception: return f"Could not parse datetime: {datetime_iso}. Use ISO 8601 format." if ts <= time.time(): return "That time has already passed. Please specify a future time." result = await self.memory.create_scheduled(sender, room_id, message, ts, repeat or "once") if "error" in result: return f"Failed to create reminder: {result['error']}" time_str = parsed.strftime("%B %d, %Y at %H:%M") tz_name = str(parsed.tzinfo) if parsed.tzinfo else "Europe/Berlin" repeat_str = f" (repeats {repeat})" if repeat and repeat != "once" else "" return f"Reminder #{result.get('id', '?')} set for {time_str} ({tz_name}){repeat_str}: {message}" async def _list_reminders(self, sender: str) -> str: """List user's active reminders.""" reminders = await self.memory.list_scheduled(sender) if not reminders: return "You have no active reminders." from datetime import datetime as dt, timezone lines = [] for r in reminders: t = dt.fromtimestamp(r["scheduled_at"], tz=timezone.utc) time_str = t.strftime("%Y-%m-%d %H:%M UTC") repeat = f" ({r['repeat_pattern']})" if r["repeat_pattern"] != "once" else "" lines.append(f"**#{r['id']}** — {time_str}{repeat}: {r['message_text']}") return f"**Your reminders ({len(lines)}):**\n" + "\n".join(lines) async def _cancel_reminder(self, sender: str, reminder_id: int) -> str: """Cancel a reminder by ID.""" if not reminder_id: return "Please provide a reminder ID to cancel." result = await self.memory.cancel_scheduled(sender, reminder_id) if "error" in result: return f"Could not cancel reminder #{reminder_id}: {result['error']}" return f"Reminder #{reminder_id} cancelled." # -- Escalation patterns for model routing -- _ESCALATION_KEYWORDS = re.compile( r"\b(debug|architecture|algorithm|regex|sql|refactor|optimize|migration" r"|explain\s+in\s+detail|explain\s+how|step.by.step)\b", re.IGNORECASE, ) _EXPLICIT_ESCALATION = re.compile( r"\b(think\s+harder|detailed|comprehensive|deep\s+dive|ausf[üu]hrlich|genau\s+erkl[äa]r)\b", re.IGNORECASE, ) def _check_escalation(self, user_message: str, image_data: tuple | None) -> str | None: """Return escalation reason string, or None if Haiku suffices.""" if image_data: return "multimodal" if len(user_message) > 500: return "long_message" if "```" in user_message: return "code_block" m = self._ESCALATION_KEYWORDS.search(user_message) if m: return f"technical_keyword:{m.group(0).lower()}" if user_message.count("?") >= 3: return "multi_question" if self._EXPLICIT_ESCALATION.search(user_message): return "explicit_request" return None def _select_model(self, room_id: str, user_message: str, image_data: tuple | None) -> tuple[str, str]: """Pick model: room override > escalation heuristics > BASE_MODEL.""" if room_id in self.room_models: return self.room_models[room_id], "room_override" reason = self._check_escalation(user_message, image_data) if reason: sentry_sdk.set_tag("escalated", "true") sentry_sdk.set_tag("escalation_reason", reason) sentry_sdk.add_breadcrumb(category="model", message=f"Escalated to {ESCALATION_MODEL}: {reason}") return ESCALATION_MODEL, reason sentry_sdk.set_tag("escalated", "false") return BASE_MODEL, "default" async def _respond_with_ai(self, room, user_message: str, sender: str = None, image_data: tuple = None) -> str | None: """Send AI response and return the reply text (or None on failure).""" model, escalation_reason = self._select_model(room.room_id, user_message, image_data) sentry_sdk.set_tag("model_used", model) sentry_sdk.set_context("ai_request", { "message_length": len(user_message), "has_images": bool(image_data), "escalation_reason": escalation_reason, "room_id": room.room_id[:30], }) logger.info("Model selected: %s (reason: %s) for room %s", model, escalation_reason, room.room_id[:30]) # Fetch conversation history FIRST (needed for query rewriting) history = [] try: resp = await self.client.room_messages( room.room_id, start=self.client.next_batch or "", limit=30 ) if hasattr(resp, "chunk"): for evt in reversed(resp.chunk): if not hasattr(evt, "body"): continue role = "assistant" if evt.sender == BOT_USER else "user" history.append({"role": role, "content": evt.body}) except Exception: logger.debug("Could not fetch room history, proceeding without context") # Rewrite query using conversation context for better RAG search search_query = await self._rewrite_query(user_message, history) # Run RAG search, memory query, and chunk query in parallel (independent) if sender: doc_results_coro = self.rag.search(search_query, matrix_user_id=sender) memories_coro = self.memory.query(sender, user_message, top_k=10) chunks_coro = self.memory.query_chunks(search_query, user_id=sender, room_id=room.room_id, top_k=5) doc_results, memories, chunks = await asyncio.gather( doc_results_coro, memories_coro, chunks_coro, return_exceptions=True, ) # Handle exceptions from gather if isinstance(doc_results, BaseException): logger.warning("RAG search failed: %s", doc_results) doc_results = [] if isinstance(memories, BaseException): logger.warning("Memory query failed: %s", memories) memories = [] if isinstance(chunks, BaseException): logger.warning("Chunk query failed: %s", chunks) chunks = [] else: doc_results, memories, chunks = [], [], [] doc_context = self.rag.format_context(doc_results) if doc_context: logger.info("RAG found %d docs for: %s (original: %s)", len(doc_results), search_query[:50], user_message[:50]) else: logger.info("RAG found 0 docs for: %s (original: %s)", search_query[:50], user_message[:50]) memory_context = self._format_memories(memories) chunk_context = self._format_chunks(chunks) # Include room document context (PDFs, Confluence pages, images uploaded to room) room_doc_context = "" room_docs = [e for e in self._room_document_context.get(room.room_id, []) if time.time() - e["timestamp"] < 3600] if room_docs: parts = [] for e in room_docs: label = {"pdf": "PDF", "image": "Image", "confluence": "Confluence", "text": "File", "docx": "Word"}.get(e["type"], "Document") text = e["text"][:20000] if e["type"] != "image" else e["text"][:2000] parts.append(f"[{label}: {e['filename']}]\n{text}") room_doc_context = ( "Documents available in this room (uploaded or linked by the user):\n\n" + "\n\n---\n\n".join(parts) + "\n\nUse these documents to answer questions. " "You CAN access and read these documents — never say you cannot." ) # Inject voice call transcript if there's an active call in this room voice_context = "" vs = self.voice_sessions.get(room.room_id) if vs: transcript = vs.get_transcript() if transcript: recent = transcript[-20:] # last 20 entries lines = [f"{'User' if e['role'] == 'user' else 'Assistant'}: {e['text']}" for e in recent] voice_context = ( "Active voice call transcript (recent conversation spoken aloud):\n" + "\n".join(lines) + "\n\nThe user is currently in a voice call and also typing in chat. " "Use the voice transcript as context for their text messages." ) # Build conversation context messages = [{"role": "system", "content": SYSTEM_PROMPT}] if memory_context: messages.append({"role": "system", "content": memory_context}) if chunk_context: messages.append({"role": "system", "content": chunk_context}) if room_doc_context: messages.append({"role": "system", "content": room_doc_context}) if voice_context: messages.append({"role": "system", "content": voice_context}) # When RAG returns documents, limit history to 4 messages (2 exchanges) to prevent # stale answer patterns from overriding fresh search results history_limit = 4 if doc_context else 10 messages.extend(history[-history_limit:]) if doc_context: messages.append({"role": "system", "content": doc_context}) # Add current user message (multimodal if image provided) if image_data: b64_str, mime_type = image_data user_content = [ {"type": "text", "text": user_message}, {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_str}"}} ] messages.append({"role": "user", "content": user_content}) else: messages.append({"role": "user", "content": user_message}) # Determine available tools (no tools when analyzing images) tools = ALL_TOOLS if not image_data else None try: reply = "" # Agentic tool-calling loop: iterate up to MAX_TOOL_ITERATIONS for iteration in range(MAX_TOOL_ITERATIONS): resp = await self.llm.chat.completions.create( model=model, messages=messages, max_tokens=2048, tools=tools, ) choice = resp.choices[0] reply = choice.message.content or "" sentry_sdk.add_breadcrumb( category="llm", message=f"LLM response: {model}", data={ "tokens_in": getattr(resp.usage, "prompt_tokens", 0) if resp.usage else 0, "tokens_out": getattr(resp.usage, "completion_tokens", 0) if resp.usage else 0, "tool_calls": len(choice.message.tool_calls or []), "iteration": iteration, }, ) if not choice.message.tool_calls: # No tool calls — final text response break # Process tool calls and feed results back # Append the assistant message with tool_calls assistant_msg = {"role": "assistant", "content": reply or None, "tool_calls": []} for tc in choice.message.tool_calls: assistant_msg["tool_calls"].append({ "id": tc.id, "type": "function", "function": {"name": tc.function.name, "arguments": tc.function.arguments}, }) messages.append(assistant_msg) # Execute tools in parallel when multiple are requested async def _run_tool(tc): try: args = json.loads(tc.function.arguments) except json.JSONDecodeError: args = {} result = await self._execute_tool(tc.function.name, args, sender, room.room_id) logger.info("Tool %s executed (iter %d) for %s", tc.function.name, iteration, sender) return {"role": "tool", "tool_call_id": tc.id, "content": result} tool_results = await asyncio.gather( *[_run_tool(tc) for tc in choice.message.tool_calls] ) messages.extend(tool_results) # Tag whether tools were used during the loop if iteration > 0: sentry_sdk.set_tag("used_tools", "true") # Send final reply if reply: await self._send_text(room.room_id, reply) # Extract and store new memories + conversation chunk (after reply sent) if sender and reply: existing_facts = [m["fact"] for m in memories] try: await asyncio.wait_for( asyncio.gather( self._extract_and_store_memories( user_message, reply, existing_facts, model, sender, room.room_id ), self._store_conversation_chunk( user_message, reply, sender, room.room_id ), ), timeout=20.0, ) except asyncio.TimeoutError: logger.warning("Memory/chunk extraction timed out for %s", sender) except Exception: logger.warning("Memory/chunk save failed", exc_info=True) # Auto-rename: only for group rooms with explicit opt-in (not DMs) if room.room_id in self.auto_rename_rooms: last_rename = self.renamed_rooms.get(room.room_id, 0) gap_seconds = time.time() - last_rename if last_rename else float("inf") if gap_seconds > 300: await self._auto_rename_room(room, user_message, reply) return reply except Exception: logger.exception("LLM call failed") await self._send_text(room.room_id, "Sorry, I couldn't generate a response.") return None async def _rewrite_query(self, user_message: str, history: list[dict]) -> str: """Rewrite user message into a standalone search query using conversation context.""" if not history or not self.llm: return user_message # Build a compact history summary (last 4 messages max) recent = history[-4:] context_lines = [] for msg in recent: prefix = "User" if msg["role"] == "user" else "Assistant" context_lines.append(f"{prefix}: {msg['content'][:200]}") context_text = "\n".join(context_lines) try: resp = await self.llm.chat.completions.create( model=BASE_MODEL, messages=[ {"role": "system", "content": ( "You are a search query rewriter. Given conversation history and a new user message, " "produce a single standalone search query that resolves all pronouns and references " "(like 'this house', 'that document', 'it') using context from the conversation. " "Reply with ONLY the rewritten search query in the same language as the user message. " "No explanation, no quotes. If the message is already self-contained, return it as-is." )}, {"role": "user", "content": f"Conversation:\n{context_text}\n\nNew message: {user_message}"}, ], max_tokens=100, ) rewritten = resp.choices[0].message.content.strip().strip('"\'') if rewritten and len(rewritten) < 500: logger.info("Query rewritten: '%s' -> '%s'", user_message[:50], rewritten[:50]) return rewritten except Exception: logger.debug("Query rewrite failed, using original", exc_info=True) return user_message async def _auto_rename_room(self, room, user_message: str, ai_reply: str): """Generate a short topic title and set it as the room name (Open WebUI style).""" # Skip rename check — always generate fresh title based on current conversation try: resp = await self.llm.chat.completions.create( model=BASE_MODEL, messages=[ {"role": "user", "content": user_message}, {"role": "assistant", "content": ai_reply[:300]}, {"role": "user", "content": ( "Generate a concise, 3-5 word title with an emoji as prefix " "that summarizes the conversation above. " "Use the same language as the conversation. " "Do not use quotation marks or formatting. " "Respond with ONLY the title, nothing else." )}, ], max_tokens=30, ) title = resp.choices[0].message.content.strip().strip('"\'') if not title or len(title) > 80 or len(title) < 3: return await self.client.room_put_state( room.room_id, "m.room.name", {"name": title}, state_key="", ) self.renamed_rooms[room.room_id] = time.time() logger.info("Auto-renamed room %s to: %s", room.room_id, title) except Exception: logger.debug("Auto-rename failed", exc_info=True) @staticmethod def _md_to_html(text: str) -> str: """Minimal markdown to HTML for Matrix formatted_body.""" import html as html_mod safe = html_mod.escape(text) # Code blocks (```...```) safe = re.sub(r"```(\w*)\n(.*?)```", r"\2", safe, flags=re.DOTALL)
# Inline code
safe = re.sub(r"`([^`]+)`", r"\1", safe)
# Bold
safe = re.sub(r"\*\*(.+?)\*\*", r"\1", safe)
# Italic
safe = re.sub(r"\*(.+?)\*", r"\1", safe)
# Markdown links [text](url) — must unescape the URL parts first
def _link_repl(m):
import html as _h
label = m.group(1)
url = _h.unescape(m.group(2))
return f'{label}'
safe = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", _link_repl, safe)
# Bare URLs (not already in an tag)
safe = re.sub(r'(?)(https?://[^\s<]+)', r'\1', safe)
# Headings (### before ## before # to match longest first)
safe = re.sub(r"^### (.+)$", r"