From f4bdae7a1e07eced16cd7509403257a993d837d3 Mon Sep 17 00:00:00 2001 From: Christian Gick Date: Wed, 15 Apr 2026 18:48:48 +0300 Subject: [PATCH] =?UTF-8?q?perf(MAT):=20cut=20bot=20reply=20latency=20?= =?UTF-8?q?=E2=80=94=20stream,=20skip=20redundant=20rewrite,=20non-blockin?= =?UTF-8?q?g=20persist?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Latency was dominated by the LLM call chain, not the 10-message context window. Three fixes land together in the chat pipeline in bot.py: 1. Stream the main LLM call (new _stream_chat_completion helper) and progressively edit the Matrix message via m.replace. Suppress visible streaming during tool-calling iterations so the user never sees rolled-back text. Final send is an authoritative edit that guarantees the full reply. 2. Gate _rewrite_query behind a pronoun/deictic heuristic (EN/DE/FR). When a message has no references needing resolution we skip the extra Haiku round-trip entirely and feed the original message to RAG directly. 3. Fire-and-forget the post-reply memory + chunk persistence with asyncio background tasks so a slow extraction no longer blocks the next inbound message. 20s timeout preserved inside the bg task; exceptions logged. Added unit test for the pronoun heuristic (EN/DE/FR positive + negative cases, short/empty messages). Co-Authored-By: Claude Opus 4.6 --- bot.py | 269 ++++++++++++++++++++++++++---- tests/test_needs_query_rewrite.py | 41 +++++ 2 files changed, 276 insertions(+), 34 deletions(-) create mode 100644 tests/test_needs_query_rewrite.py diff --git a/bot.py b/bot.py index 5664417..641df70 100644 --- a/bot.py +++ b/bot.py @@ -1241,6 +1241,7 @@ class Bot: vault_key=f"matrix.{BOT_USER.split(':')[0].lstrip('@')}.cross_signing_seeds", ) self._room_document_context: dict[str, list[dict]] = {} # room_id -> [{type, filename, text, timestamp}, ...] + self._bg_tasks: set[asyncio.Task] = set() # fire-and-forget post-reply work (memory/chunk persistence) # Article summary handler (Blinkist-style audio summaries) if self.llm and ELEVENLABS_API_KEY: self.article_handler = ArticleSummaryHandler( @@ -3210,56 +3211,58 @@ class Bot: try: reply = "" + streamed_event_id: str | None = None # set when streaming has already posted a message in Matrix # Agentic tool-calling loop: iterate up to MAX_TOOL_ITERATIONS for iteration in range(MAX_TOOL_ITERATIONS): - resp = await self.llm.chat.completions.create( + content, tool_calls, usage, streamed_event_id = await self._stream_chat_completion( + room_id=room.room_id, model=model, messages=messages, - max_tokens=2048, tools=tools, + prior_event_id=streamed_event_id, ) - choice = resp.choices[0] - reply = choice.message.content or "" + reply = content sentry_sdk.add_breadcrumb( category="llm", message=f"LLM response: {model}", data={ - "tokens_in": getattr(resp.usage, "prompt_tokens", 0) if resp.usage else 0, - "tokens_out": getattr(resp.usage, "completion_tokens", 0) if resp.usage else 0, - "tool_calls": len(choice.message.tool_calls or []), + "tokens_in": (usage or {}).get("prompt_tokens", 0), + "tokens_out": (usage or {}).get("completion_tokens", 0), + "tool_calls": len(tool_calls or []), "iteration": iteration, + "streamed": streamed_event_id is not None, }, ) - if not choice.message.tool_calls: + if not tool_calls: # No tool calls — final text response break # Process tool calls and feed results back # Append the assistant message with tool_calls assistant_msg = {"role": "assistant", "content": reply or None, "tool_calls": []} - for tc in choice.message.tool_calls: + for tc in tool_calls: assistant_msg["tool_calls"].append({ - "id": tc.id, + "id": tc["id"], "type": "function", - "function": {"name": tc.function.name, "arguments": tc.function.arguments}, + "function": {"name": tc["name"], "arguments": tc["arguments"]}, }) messages.append(assistant_msg) # Execute tools in parallel when multiple are requested async def _run_tool(tc): try: - args = json.loads(tc.function.arguments) + args = json.loads(tc["arguments"]) except json.JSONDecodeError: args = {} - result = await self._execute_tool(tc.function.name, args, sender, room.room_id) - logger.info("Tool %s executed (iter %d) for %s", tc.function.name, iteration, sender) - return {"role": "tool", "tool_call_id": tc.id, "content": result} + result = await self._execute_tool(tc["name"], args, sender, room.room_id) + logger.info("Tool %s executed (iter %d) for %s", tc["name"], iteration, sender) + return {"role": "tool", "tool_call_id": tc["id"], "content": result} tool_results = await asyncio.gather( - *[_run_tool(tc) for tc in choice.message.tool_calls] + *[_run_tool(tc) for tc in tool_calls] ) messages.extend(tool_results) @@ -3267,29 +3270,41 @@ class Bot: if iteration > 0: sentry_sdk.set_tag("used_tools", "true") - # Send final reply + # Send / finalize reply. If we streamed, just do a final edit so the + # Matrix message reflects the complete text (otherwise progressive + # throttling may have stopped short of the last tokens). if reply: - await self._send_text(room.room_id, reply) + if streamed_event_id: + await self._send_stream_edit(room.room_id, streamed_event_id, reply, final=True) + else: + await self._send_text(room.room_id, reply) - # Extract and store new memories + conversation chunk (after reply sent) + # Extract and store new memories + conversation chunk (after reply sent). + # Fire-and-forget: we must not block the next inbound message on this. if sender and reply: existing_facts = [m["fact"] for m in memories] - try: - await asyncio.wait_for( - asyncio.gather( - self._extract_and_store_memories( - user_message, reply, existing_facts, model, sender, room.room_id + + async def _bg_persist(): + try: + await asyncio.wait_for( + asyncio.gather( + self._extract_and_store_memories( + user_message, reply, existing_facts, model, sender, room.room_id + ), + self._store_conversation_chunk( + user_message, reply, sender, room.room_id + ), ), - self._store_conversation_chunk( - user_message, reply, sender, room.room_id - ), - ), - timeout=20.0, - ) - except asyncio.TimeoutError: - logger.warning("Memory/chunk extraction timed out for %s", sender) - except Exception: - logger.warning("Memory/chunk save failed", exc_info=True) + timeout=20.0, + ) + except asyncio.TimeoutError: + logger.warning("Memory/chunk extraction timed out for %s", sender) + except Exception: + logger.warning("Memory/chunk save failed", exc_info=True) + + task = asyncio.create_task(_bg_persist()) + self._bg_tasks.add(task) + task.add_done_callback(self._bg_tasks.discard) # Auto-rename: only for group rooms with explicit opt-in (not DMs) if room.room_id in self.auto_rename_rooms: @@ -3318,10 +3333,38 @@ class Bot: await self._send_text(room.room_id, "Sorry, I couldn't generate a response.") return None + # Pronouns / deictic references across EN/DE/FR that signal the message may + # need context resolution. If none are present we skip the rewrite LLM call. + _REWRITE_TRIGGER_TOKENS = frozenset([ + # EN + "it", "its", "this", "that", "these", "those", "he", "she", "they", + "them", "him", "her", "his", "their", "there", "here", + # DE + "es", "das", "dies", "diese", "dieser", "dieses", "er", "sie", "ihn", + "ihm", "ihr", "ihnen", "dort", "hier", "dem", "den", + # FR + "il", "elle", "ils", "elles", "ce", "cet", "cette", "ces", "ça", "ca", + "là", "la", "ici", "lui", "leur", "leurs", + ]) + + @classmethod + def _needs_query_rewrite(cls, user_message: str) -> bool: + """Heuristic: only call the rewrite LLM when the message likely has + unresolved references. Saves a full Haiku round-trip otherwise.""" + msg = user_message.strip() + if len(msg) < 6: + return False + tokens = re.findall(r"[\wÀ-ÿ]+", msg.lower()) + if not tokens: + return False + return any(t in cls._REWRITE_TRIGGER_TOKENS for t in tokens) + async def _rewrite_query(self, user_message: str, history: list[dict]) -> str: """Rewrite user message into a standalone search query using conversation context.""" if not history or not self.llm: return user_message + if not self._needs_query_rewrite(user_message): + return user_message # Build a compact history summary (last 4 messages max) recent = history[-4:] @@ -3616,6 +3659,164 @@ class Bot: except Exception as e: logger.error("Send failed in room %s: %s", room_id, e) + async def _send_stream_start(self, room_id: str, text: str) -> str | None: + """Send the initial (partial) message for a streamed reply. Returns event_id.""" + try: + resp = await self.client.room_send( + room_id, + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": text, + "format": "org.matrix.custom.html", + "formatted_body": self._md_to_html(text), + }, + ) + return getattr(resp, "event_id", None) + except Exception as e: + logger.warning("Stream start send failed in room %s: %s", room_id, e) + return None + + async def _send_stream_edit(self, room_id: str, event_id: str, text: str, final: bool = False): + """Replace an in-flight streamed message with updated text (m.replace).""" + if not event_id: + return + try: + new_html = self._md_to_html(text) + await self.client.room_send( + room_id, + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": "* " + text, + "format": "org.matrix.custom.html", + "formatted_body": "* " + new_html, + "m.new_content": { + "msgtype": "m.text", + "body": text, + "format": "org.matrix.custom.html", + "formatted_body": new_html, + }, + "m.relates_to": { + "rel_type": "m.replace", + "event_id": event_id, + }, + }, + ) + except Exception as e: + # Edits during streaming are best-effort — log at warning, final send will recover + level = logger.error if final else logger.warning + level("Stream edit failed in room %s: %s", room_id, e) + + async def _stream_chat_completion( + self, + *, + room_id: str, + model: str, + messages: list[dict], + tools: list | None, + prior_event_id: str | None = None, + ) -> tuple[str, list[dict] | None, dict | None, str | None]: + """Stream one chat completion turn. + + Progressively edits a Matrix message as content tokens arrive (unless + tool_calls have started — those suppress visible streaming until the + model settles on plain text on a later iteration). + + Returns (content, tool_calls or None, usage dict or None, event_id). + `event_id` is the Matrix event we've been streaming into, or None if + we didn't (yet) post a visible message this turn. + """ + content_parts: list[str] = [] + tool_calls_acc: dict[int, dict] = {} + usage: dict | None = None + event_id = prior_event_id + last_edit = 0.0 + EDIT_THROTTLE = 0.6 # seconds — keep Matrix edit traffic reasonable + MIN_CHARS_BEFORE_POST = 20 # avoid posting a single character first + + try: + stream = await self.llm.chat.completions.create( + model=model, + messages=messages, + max_tokens=2048, + tools=tools, + stream=True, + ) + except TypeError: + # stream kwarg unsupported by the installed SDK — fall back to one-shot + resp = await self.llm.chat.completions.create( + model=model, messages=messages, max_tokens=2048, tools=tools, + ) + choice = resp.choices[0] + tc_list = None + if choice.message.tool_calls: + tc_list = [ + {"id": tc.id, "name": tc.function.name, "arguments": tc.function.arguments} + for tc in choice.message.tool_calls + ] + u = None + if resp.usage: + u = { + "prompt_tokens": getattr(resp.usage, "prompt_tokens", 0), + "completion_tokens": getattr(resp.usage, "completion_tokens", 0), + } + return choice.message.content or "", tc_list, u, event_id + + async for chunk in stream: + if not chunk.choices: + # OpenAI sends a final chunk with usage and no choices + if getattr(chunk, "usage", None): + usage = { + "prompt_tokens": getattr(chunk.usage, "prompt_tokens", 0), + "completion_tokens": getattr(chunk.usage, "completion_tokens", 0), + } + continue + delta = chunk.choices[0].delta + + tc_deltas = getattr(delta, "tool_calls", None) + if tc_deltas: + for tc in tc_deltas: + idx = getattr(tc, "index", 0) or 0 + slot = tool_calls_acc.setdefault(idx, {"id": "", "name": "", "arguments": ""}) + if getattr(tc, "id", None): + slot["id"] = tc.id + fn = getattr(tc, "function", None) + if fn: + if getattr(fn, "name", None): + slot["name"] += fn.name + if getattr(fn, "arguments", None): + slot["arguments"] += fn.arguments + + content_delta = getattr(delta, "content", None) + if content_delta: + content_parts.append(content_delta) + # Suppress visible streaming once we know this turn will end in tool calls + if not tool_calls_acc: + now = time.monotonic() + if now - last_edit >= EDIT_THROTTLE: + text_so_far = "".join(content_parts) + if len(text_so_far) >= MIN_CHARS_BEFORE_POST: + if event_id is None: + event_id = await self._send_stream_start(room_id, text_so_far) + else: + await self._send_stream_edit(room_id, event_id, text_so_far) + last_edit = now + + # Some providers attach usage to the last choice chunk + usage_attr = getattr(chunk, "usage", None) + if usage_attr: + usage = { + "prompt_tokens": getattr(usage_attr, "prompt_tokens", 0), + "completion_tokens": getattr(usage_attr, "completion_tokens", 0), + } + + content = "".join(content_parts) + tc_list = None + if tool_calls_acc: + tc_list = [tool_calls_acc[i] for i in sorted(tool_calls_acc.keys())] + return content, tc_list, usage, event_id + async def _get_call_encryption_key(self, room_id: str, sender: str, caller_device_id: str = "") -> bytes | None: """Read E2EE encryption key from call.member state (MSC4143) or timeline (legacy). diff --git a/tests/test_needs_query_rewrite.py b/tests/test_needs_query_rewrite.py new file mode 100644 index 0000000..e7da396 --- /dev/null +++ b/tests/test_needs_query_rewrite.py @@ -0,0 +1,41 @@ +"""Heuristic gate for `_rewrite_query` (bot.py). Skips the LLM round-trip when +the message has no pronouns or deictic references that would need context.""" + +from bot import Bot + + +def _needs(msg: str) -> bool: + return Bot._needs_query_rewrite(msg) + + +def test_short_message_skipped(): + assert _needs("hi") is False + assert _needs("ok") is False + + +def test_self_contained_no_pronouns_skipped(): + assert _needs("What is the capital of France?") is False + assert _needs("Summarize the Q3 earnings report") is False + assert _needs("Wie ist das Wetter in Berlin morgen") is False + + +def test_english_pronouns_trigger(): + assert _needs("What does it mean?") is True + assert _needs("Can you fix that?") is True + assert _needs("Tell me more about them") is True + + +def test_german_pronouns_trigger(): + assert _needs("Was bedeutet das?") is True + assert _needs("Kannst du es noch einmal erklären") is True + assert _needs("Wer sind sie?") is True + + +def test_french_pronouns_trigger(): + assert _needs("Qu'est-ce que ça veut dire?") is True + assert _needs("Parle-moi de lui") is True + + +def test_empty_or_whitespace(): + assert _needs("") is False + assert _needs(" ") is False