diff --git a/bot.py b/bot.py index 8f9fdf2..237542a 100644 --- a/bot.py +++ b/bot.py @@ -1317,6 +1317,24 @@ class Bot: await self._send_text(room_id, f"**Anruf-Zusammenfassung:**\n\n{summary}") except Exception: logger.exception("Failed to post call summary for %s", room_id) + # Persist voice transcript as conversation chunks in memory service + try: + caller = event.sender + for entry in transcript: + if entry["role"] == "user": + user_text = entry["text"] + # Find the next assistant response + idx = transcript.index(entry) + ai_text = "" + if idx + 1 < len(transcript) and transcript[idx + 1]["role"] == "assistant": + ai_text = transcript[idx + 1]["text"] + if user_text and ai_text: + await self._store_conversation_chunk( + user_text, ai_text, caller, room_id + ) + logger.info("Stored %d voice transcript chunks for %s", len(transcript) // 2, room_id) + except Exception: + logger.warning("Failed to store voice transcript chunks for %s", room_id, exc_info=True) # Extract and post document annotations if a document was discussed if doc_context: try: @@ -1415,6 +1433,22 @@ class Bot: summary=summary, original_ts=time.time(), ) + # Regex for detecting personal facts worth extracting (pronouns, possessives, identity markers) + _PERSONAL_FACT_RE = re.compile( + r"\b(ich|mein|meine|meinem|i'm|i am|my |mine|we |our |" + r"name is|hei[sß]e|wohne|arbeite|lebe|studier|born|live|work|" + r"prefer|favorite|hobby|birthday|family|wife|husband|partner|child|dog|cat)\b", + re.IGNORECASE, + ) + + def _is_trivial_message(self, text: str) -> bool: + """Return True if the message is too trivial for memory extraction.""" + if len(text) >= 20: + return False + if self._PERSONAL_FACT_RE.search(text): + return False + return True + async def _extract_and_store_memories(self, user_message: str, ai_reply: str, existing_facts: list[str], model: str, sender: str, room_id: str): @@ -1422,6 +1456,11 @@ class Bot: if not self.llm: return + # Skip extraction for trivial messages (saves ~2-3s + 1 LLM call) + if self._is_trivial_message(user_message): + logger.debug("Skipping memory extraction for trivial message: %s", user_message[:40]) + return + existing_text = "\n".join(f"- {f}" for f in existing_facts) if existing_facts else "(none)" logger.info("Memory extraction: user_msg=%s... (%d existing facts)", user_message[:80], len(existing_facts)) @@ -2221,23 +2260,34 @@ class Bot: # Rewrite query using conversation context for better RAG search search_query = await self._rewrite_query(user_message, history) - # Document context via MatrixHost API - doc_results = await self.rag.search(search_query, matrix_user_id=sender) if sender else [] + # Run RAG search, memory query, and chunk query in parallel (independent) + if sender: + doc_results_coro = self.rag.search(search_query, matrix_user_id=sender) + memories_coro = self.memory.query(sender, user_message, top_k=10) + chunks_coro = self.memory.query_chunks(search_query, user_id=sender, room_id=room.room_id, top_k=5) + doc_results, memories, chunks = await asyncio.gather( + doc_results_coro, memories_coro, chunks_coro, + return_exceptions=True, + ) + # Handle exceptions from gather + if isinstance(doc_results, BaseException): + logger.warning("RAG search failed: %s", doc_results) + doc_results = [] + if isinstance(memories, BaseException): + logger.warning("Memory query failed: %s", memories) + memories = [] + if isinstance(chunks, BaseException): + logger.warning("Chunk query failed: %s", chunks) + chunks = [] + else: + doc_results, memories, chunks = [], [], [] + doc_context = self.rag.format_context(doc_results) if doc_context: logger.info("RAG found %d docs for: %s (original: %s)", len(doc_results), search_query[:50], user_message[:50]) else: logger.info("RAG found 0 docs for: %s (original: %s)", search_query[:50], user_message[:50]) - - # Query relevant memories via semantic search - memories = await self.memory.query(sender, user_message, top_k=10) if sender else [] memory_context = self._format_memories(memories) - - # Query relevant conversation chunks (RAG over chat history) - if sender: - chunks = await self.memory.query_chunks(search_query, user_id=sender, room_id=room.room_id, top_k=5) - else: - chunks = [] chunk_context = self._format_chunks(chunks) # Include room document context (PDFs, Confluence pages, images uploaded to room) @@ -2327,19 +2377,20 @@ class Bot: }) messages.append(assistant_msg) - # Execute each tool and append results - for tc in choice.message.tool_calls: + # Execute tools in parallel when multiple are requested + async def _run_tool(tc): try: args = json.loads(tc.function.arguments) except json.JSONDecodeError: args = {} result = await self._execute_tool(tc.function.name, args, sender, room.room_id) - messages.append({ - "role": "tool", - "tool_call_id": tc.id, - "content": result, - }) logger.info("Tool %s executed (iter %d) for %s", tc.function.name, iteration, sender) + return {"role": "tool", "tool_call_id": tc.id, "content": result} + + tool_results = await asyncio.gather( + *[_run_tool(tc) for tc in choice.message.tool_calls] + ) + messages.extend(tool_results) # Tag whether tools were used during the loop if iteration > 0: diff --git a/memory-service/main.py b/memory-service/main.py index 2ea0c46..b1d56ff 100644 --- a/memory-service/main.py +++ b/memory-service/main.py @@ -29,6 +29,7 @@ MEMORY_SERVICE_TOKEN = os.environ.get("MEMORY_SERVICE_TOKEN", "") app = FastAPI(title="Memory Service") pool: asyncpg.Pool | None = None owner_pool: asyncpg.Pool | None = None +_pool_healthy = True async def verify_token(authorization: str | None = Header(None)): @@ -161,9 +162,36 @@ async def _set_rls_user(conn, user_id: str): await conn.execute("SELECT set_config('app.current_user_id', $1, false)", user_id) +async def _ensure_pool(): + """Recreate the connection pool if it was lost.""" + global pool, owner_pool, _pool_healthy + if pool and _pool_healthy: + return + logger.warning("Reconnecting asyncpg pools (healthy=%s, pool=%s)", _pool_healthy, pool is not None) + try: + if pool: + try: + await pool.close() + except Exception: + pass + if owner_pool: + try: + await owner_pool.close() + except Exception: + pass + pool = await asyncpg.create_pool(DB_DSN, min_size=2, max_size=10) + owner_pool = await asyncpg.create_pool(OWNER_DSN, min_size=1, max_size=2) + _pool_healthy = True + logger.info("asyncpg pools reconnected successfully") + except Exception: + _pool_healthy = False + logger.exception("Failed to reconnect asyncpg pools") + raise + + async def _init_db(): """Create pgvector extension and memories table if not exists.""" - global pool + global pool, _pool_healthy # Use owner connection for DDL (CREATE TABLE/INDEX), then create restricted pool owner_conn = await asyncpg.connect(OWNER_DSN) conn = owner_conn @@ -185,7 +213,7 @@ async def _init_db(): await conn.execute(f""" CREATE INDEX IF NOT EXISTS idx_memories_embedding ON memories USING ivfflat (embedding vector_cosine_ops) - WITH (lists = 10) + WITH (lists = 100) """) # Conversation chunks table for RAG over chat history await conn.execute(f""" @@ -216,6 +244,7 @@ async def _init_db(): # Owner pool for admin queries (bypasses RLS) — 1 connection only global owner_pool owner_pool = await asyncpg.create_pool(OWNER_DSN, min_size=1, max_size=2) + _pool_healthy = True logger.info("Database initialized (dims=%d, encryption=%s)", EMBED_DIMS, "ON" if ENCRYPTION_KEY else "OFF") @@ -234,7 +263,9 @@ async def shutdown(): @app.get("/health") async def health(): - if owner_pool: + global _pool_healthy + try: + await _ensure_pool() async with owner_pool.acquire() as conn: mem_count = await conn.fetchval("SELECT count(*) FROM memories") chunk_count = await conn.fetchval("SELECT count(*) FROM conversation_chunks") @@ -244,7 +275,10 @@ async def health(): "total_chunks": chunk_count, "encryption": "on" if ENCRYPTION_KEY else "off", } - return {"status": "no_db"} + except Exception as e: + _pool_healthy = False + logger.error("Health check failed: %s", e) + return {"status": "unhealthy", "error": str(e)} @app.post("/memories/store") @@ -256,6 +290,7 @@ async def store_memory(req: StoreRequest, _: None = Depends(verify_token)): embedding = await _embed(req.fact) vec_literal = "[" + ",".join(str(v) for v in embedding) + "]" + await _ensure_pool() async with pool.acquire() as conn: await _set_rls_user(conn, req.user_id) @@ -291,6 +326,7 @@ async def query_memories(req: QueryRequest, _: None = Depends(verify_token)): embedding = await _embed(req.query) vec_literal = "[" + ",".join(str(v) for v in embedding) + "]" + await _ensure_pool() async with pool.acquire() as conn: await _set_rls_user(conn, req.user_id) @@ -321,6 +357,7 @@ async def query_memories(req: QueryRequest, _: None = Depends(verify_token)): @app.delete("/memories/{user_id}") async def delete_user_memories(user_id: str, _: None = Depends(verify_token)): """GDPR delete — remove all memories for a user.""" + await _ensure_pool() async with pool.acquire() as conn: await _set_rls_user(conn, user_id) result = await conn.execute("DELETE FROM memories WHERE user_id = $1", user_id) @@ -332,6 +369,7 @@ async def delete_user_memories(user_id: str, _: None = Depends(verify_token)): @app.get("/memories/{user_id}") async def list_user_memories(user_id: str, _: None = Depends(verify_token)): """List all memories for a user (for UI/debug).""" + await _ensure_pool() async with pool.acquire() as conn: await _set_rls_user(conn, user_id) rows = await conn.fetch( @@ -369,6 +407,7 @@ async def store_chunk(req: ChunkStoreRequest, _: None = Depends(verify_token)): encrypted_text = _encrypt(req.chunk_text, req.user_id) encrypted_summary = _encrypt(req.summary, req.user_id) + await _ensure_pool() async with pool.acquire() as conn: await _set_rls_user(conn, req.user_id) await conn.execute( @@ -403,6 +442,7 @@ async def query_chunks(req: ChunkQueryRequest, _: None = Depends(verify_token)): where = f"WHERE {' AND '.join(conditions)}" params.append(req.top_k) + await _ensure_pool() async with pool.acquire() as conn: await _set_rls_user(conn, req.user_id) @@ -452,6 +492,7 @@ async def bulk_store_chunks(req: ChunkBulkStoreRequest, _: None = Depends(verify logger.error("Batch embed failed for chunks %d-%d", i, i + len(batch), exc_info=True) continue + await _ensure_pool() async with pool.acquire() as conn: for chunk, embedding in zip(batch, embeddings): await _set_rls_user(conn, chunk.user_id) @@ -477,6 +518,7 @@ async def bulk_store_chunks(req: ChunkBulkStoreRequest, _: None = Depends(verify @app.get("/chunks/{user_id}/count") async def count_user_chunks(user_id: str, _: None = Depends(verify_token)): """Count conversation chunks for a user.""" + await _ensure_pool() async with pool.acquire() as conn: await _set_rls_user(conn, user_id) count = await conn.fetchval(