perf(MAT): cut bot reply latency — stream, skip redundant rewrite, non-blocking persist
Some checks failed
Build & Deploy / test (push) Failing after 1m10s
Build & Deploy / build-and-deploy (push) Has been skipped
Tests / test (push) Failing after 9s

Latency was dominated by the LLM call chain, not the 10-message context window.
Three fixes land together in the chat pipeline in bot.py:

1. Stream the main LLM call (new _stream_chat_completion helper) and
   progressively edit the Matrix message via m.replace. Suppress visible
   streaming during tool-calling iterations so the user never sees rolled-back
   text. Final send is an authoritative edit that guarantees the full reply.

2. Gate _rewrite_query behind a pronoun/deictic heuristic (EN/DE/FR). When a
   message has no references needing resolution we skip the extra Haiku
   round-trip entirely and feed the original message to RAG directly.

3. Fire-and-forget the post-reply memory + chunk persistence with asyncio
   background tasks so a slow extraction no longer blocks the next inbound
   message. 20s timeout preserved inside the bg task; exceptions logged.

Added unit test for the pronoun heuristic (EN/DE/FR positive + negative cases,
short/empty messages).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-04-15 18:48:48 +03:00
parent 62dbf7b37b
commit f4bdae7a1e
2 changed files with 276 additions and 34 deletions

269
bot.py
View File

@@ -1241,6 +1241,7 @@ class Bot:
vault_key=f"matrix.{BOT_USER.split(':')[0].lstrip('@')}.cross_signing_seeds",
)
self._room_document_context: dict[str, list[dict]] = {} # room_id -> [{type, filename, text, timestamp}, ...]
self._bg_tasks: set[asyncio.Task] = set() # fire-and-forget post-reply work (memory/chunk persistence)
# Article summary handler (Blinkist-style audio summaries)
if self.llm and ELEVENLABS_API_KEY:
self.article_handler = ArticleSummaryHandler(
@@ -3210,56 +3211,58 @@ class Bot:
try:
reply = ""
streamed_event_id: str | None = None # set when streaming has already posted a message in Matrix
# Agentic tool-calling loop: iterate up to MAX_TOOL_ITERATIONS
for iteration in range(MAX_TOOL_ITERATIONS):
resp = await self.llm.chat.completions.create(
content, tool_calls, usage, streamed_event_id = await self._stream_chat_completion(
room_id=room.room_id,
model=model,
messages=messages,
max_tokens=2048,
tools=tools,
prior_event_id=streamed_event_id,
)
choice = resp.choices[0]
reply = choice.message.content or ""
reply = content
sentry_sdk.add_breadcrumb(
category="llm",
message=f"LLM response: {model}",
data={
"tokens_in": getattr(resp.usage, "prompt_tokens", 0) if resp.usage else 0,
"tokens_out": getattr(resp.usage, "completion_tokens", 0) if resp.usage else 0,
"tool_calls": len(choice.message.tool_calls or []),
"tokens_in": (usage or {}).get("prompt_tokens", 0),
"tokens_out": (usage or {}).get("completion_tokens", 0),
"tool_calls": len(tool_calls or []),
"iteration": iteration,
"streamed": streamed_event_id is not None,
},
)
if not choice.message.tool_calls:
if not tool_calls:
# No tool calls — final text response
break
# Process tool calls and feed results back
# Append the assistant message with tool_calls
assistant_msg = {"role": "assistant", "content": reply or None, "tool_calls": []}
for tc in choice.message.tool_calls:
for tc in tool_calls:
assistant_msg["tool_calls"].append({
"id": tc.id,
"id": tc["id"],
"type": "function",
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
"function": {"name": tc["name"], "arguments": tc["arguments"]},
})
messages.append(assistant_msg)
# Execute tools in parallel when multiple are requested
async def _run_tool(tc):
try:
args = json.loads(tc.function.arguments)
args = json.loads(tc["arguments"])
except json.JSONDecodeError:
args = {}
result = await self._execute_tool(tc.function.name, args, sender, room.room_id)
logger.info("Tool %s executed (iter %d) for %s", tc.function.name, iteration, sender)
return {"role": "tool", "tool_call_id": tc.id, "content": result}
result = await self._execute_tool(tc["name"], args, sender, room.room_id)
logger.info("Tool %s executed (iter %d) for %s", tc["name"], iteration, sender)
return {"role": "tool", "tool_call_id": tc["id"], "content": result}
tool_results = await asyncio.gather(
*[_run_tool(tc) for tc in choice.message.tool_calls]
*[_run_tool(tc) for tc in tool_calls]
)
messages.extend(tool_results)
@@ -3267,29 +3270,41 @@ class Bot:
if iteration > 0:
sentry_sdk.set_tag("used_tools", "true")
# Send final reply
# Send / finalize reply. If we streamed, just do a final edit so the
# Matrix message reflects the complete text (otherwise progressive
# throttling may have stopped short of the last tokens).
if reply:
await self._send_text(room.room_id, reply)
if streamed_event_id:
await self._send_stream_edit(room.room_id, streamed_event_id, reply, final=True)
else:
await self._send_text(room.room_id, reply)
# Extract and store new memories + conversation chunk (after reply sent)
# Extract and store new memories + conversation chunk (after reply sent).
# Fire-and-forget: we must not block the next inbound message on this.
if sender and reply:
existing_facts = [m["fact"] for m in memories]
try:
await asyncio.wait_for(
asyncio.gather(
self._extract_and_store_memories(
user_message, reply, existing_facts, model, sender, room.room_id
async def _bg_persist():
try:
await asyncio.wait_for(
asyncio.gather(
self._extract_and_store_memories(
user_message, reply, existing_facts, model, sender, room.room_id
),
self._store_conversation_chunk(
user_message, reply, sender, room.room_id
),
),
self._store_conversation_chunk(
user_message, reply, sender, room.room_id
),
),
timeout=20.0,
)
except asyncio.TimeoutError:
logger.warning("Memory/chunk extraction timed out for %s", sender)
except Exception:
logger.warning("Memory/chunk save failed", exc_info=True)
timeout=20.0,
)
except asyncio.TimeoutError:
logger.warning("Memory/chunk extraction timed out for %s", sender)
except Exception:
logger.warning("Memory/chunk save failed", exc_info=True)
task = asyncio.create_task(_bg_persist())
self._bg_tasks.add(task)
task.add_done_callback(self._bg_tasks.discard)
# Auto-rename: only for group rooms with explicit opt-in (not DMs)
if room.room_id in self.auto_rename_rooms:
@@ -3318,10 +3333,38 @@ class Bot:
await self._send_text(room.room_id, "Sorry, I couldn't generate a response.")
return None
# Pronouns / deictic references across EN/DE/FR that signal the message may
# need context resolution. If none are present we skip the rewrite LLM call.
_REWRITE_TRIGGER_TOKENS = frozenset([
# EN
"it", "its", "this", "that", "these", "those", "he", "she", "they",
"them", "him", "her", "his", "their", "there", "here",
# DE
"es", "das", "dies", "diese", "dieser", "dieses", "er", "sie", "ihn",
"ihm", "ihr", "ihnen", "dort", "hier", "dem", "den",
# FR
"il", "elle", "ils", "elles", "ce", "cet", "cette", "ces", "ça", "ca",
"", "la", "ici", "lui", "leur", "leurs",
])
@classmethod
def _needs_query_rewrite(cls, user_message: str) -> bool:
"""Heuristic: only call the rewrite LLM when the message likely has
unresolved references. Saves a full Haiku round-trip otherwise."""
msg = user_message.strip()
if len(msg) < 6:
return False
tokens = re.findall(r"[\wÀ-ÿ]+", msg.lower())
if not tokens:
return False
return any(t in cls._REWRITE_TRIGGER_TOKENS for t in tokens)
async def _rewrite_query(self, user_message: str, history: list[dict]) -> str:
"""Rewrite user message into a standalone search query using conversation context."""
if not history or not self.llm:
return user_message
if not self._needs_query_rewrite(user_message):
return user_message
# Build a compact history summary (last 4 messages max)
recent = history[-4:]
@@ -3616,6 +3659,164 @@ class Bot:
except Exception as e:
logger.error("Send failed in room %s: %s", room_id, e)
async def _send_stream_start(self, room_id: str, text: str) -> str | None:
"""Send the initial (partial) message for a streamed reply. Returns event_id."""
try:
resp = await self.client.room_send(
room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": text,
"format": "org.matrix.custom.html",
"formatted_body": self._md_to_html(text),
},
)
return getattr(resp, "event_id", None)
except Exception as e:
logger.warning("Stream start send failed in room %s: %s", room_id, e)
return None
async def _send_stream_edit(self, room_id: str, event_id: str, text: str, final: bool = False):
"""Replace an in-flight streamed message with updated text (m.replace)."""
if not event_id:
return
try:
new_html = self._md_to_html(text)
await self.client.room_send(
room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": "* " + text,
"format": "org.matrix.custom.html",
"formatted_body": "* " + new_html,
"m.new_content": {
"msgtype": "m.text",
"body": text,
"format": "org.matrix.custom.html",
"formatted_body": new_html,
},
"m.relates_to": {
"rel_type": "m.replace",
"event_id": event_id,
},
},
)
except Exception as e:
# Edits during streaming are best-effort — log at warning, final send will recover
level = logger.error if final else logger.warning
level("Stream edit failed in room %s: %s", room_id, e)
async def _stream_chat_completion(
self,
*,
room_id: str,
model: str,
messages: list[dict],
tools: list | None,
prior_event_id: str | None = None,
) -> tuple[str, list[dict] | None, dict | None, str | None]:
"""Stream one chat completion turn.
Progressively edits a Matrix message as content tokens arrive (unless
tool_calls have started — those suppress visible streaming until the
model settles on plain text on a later iteration).
Returns (content, tool_calls or None, usage dict or None, event_id).
`event_id` is the Matrix event we've been streaming into, or None if
we didn't (yet) post a visible message this turn.
"""
content_parts: list[str] = []
tool_calls_acc: dict[int, dict] = {}
usage: dict | None = None
event_id = prior_event_id
last_edit = 0.0
EDIT_THROTTLE = 0.6 # seconds — keep Matrix edit traffic reasonable
MIN_CHARS_BEFORE_POST = 20 # avoid posting a single character first
try:
stream = await self.llm.chat.completions.create(
model=model,
messages=messages,
max_tokens=2048,
tools=tools,
stream=True,
)
except TypeError:
# stream kwarg unsupported by the installed SDK — fall back to one-shot
resp = await self.llm.chat.completions.create(
model=model, messages=messages, max_tokens=2048, tools=tools,
)
choice = resp.choices[0]
tc_list = None
if choice.message.tool_calls:
tc_list = [
{"id": tc.id, "name": tc.function.name, "arguments": tc.function.arguments}
for tc in choice.message.tool_calls
]
u = None
if resp.usage:
u = {
"prompt_tokens": getattr(resp.usage, "prompt_tokens", 0),
"completion_tokens": getattr(resp.usage, "completion_tokens", 0),
}
return choice.message.content or "", tc_list, u, event_id
async for chunk in stream:
if not chunk.choices:
# OpenAI sends a final chunk with usage and no choices
if getattr(chunk, "usage", None):
usage = {
"prompt_tokens": getattr(chunk.usage, "prompt_tokens", 0),
"completion_tokens": getattr(chunk.usage, "completion_tokens", 0),
}
continue
delta = chunk.choices[0].delta
tc_deltas = getattr(delta, "tool_calls", None)
if tc_deltas:
for tc in tc_deltas:
idx = getattr(tc, "index", 0) or 0
slot = tool_calls_acc.setdefault(idx, {"id": "", "name": "", "arguments": ""})
if getattr(tc, "id", None):
slot["id"] = tc.id
fn = getattr(tc, "function", None)
if fn:
if getattr(fn, "name", None):
slot["name"] += fn.name
if getattr(fn, "arguments", None):
slot["arguments"] += fn.arguments
content_delta = getattr(delta, "content", None)
if content_delta:
content_parts.append(content_delta)
# Suppress visible streaming once we know this turn will end in tool calls
if not tool_calls_acc:
now = time.monotonic()
if now - last_edit >= EDIT_THROTTLE:
text_so_far = "".join(content_parts)
if len(text_so_far) >= MIN_CHARS_BEFORE_POST:
if event_id is None:
event_id = await self._send_stream_start(room_id, text_so_far)
else:
await self._send_stream_edit(room_id, event_id, text_so_far)
last_edit = now
# Some providers attach usage to the last choice chunk
usage_attr = getattr(chunk, "usage", None)
if usage_attr:
usage = {
"prompt_tokens": getattr(usage_attr, "prompt_tokens", 0),
"completion_tokens": getattr(usage_attr, "completion_tokens", 0),
}
content = "".join(content_parts)
tc_list = None
if tool_calls_acc:
tc_list = [tool_calls_acc[i] for i in sorted(tool_calls_acc.keys())]
return content, tc_list, usage, event_id
async def _get_call_encryption_key(self, room_id: str, sender: str, caller_device_id: str = "") -> bytes | None:
"""Read E2EE encryption key from call.member state (MSC4143) or timeline (legacy).

View File

@@ -0,0 +1,41 @@
"""Heuristic gate for `_rewrite_query` (bot.py). Skips the LLM round-trip when
the message has no pronouns or deictic references that would need context."""
from bot import Bot
def _needs(msg: str) -> bool:
return Bot._needs_query_rewrite(msg)
def test_short_message_skipped():
assert _needs("hi") is False
assert _needs("ok") is False
def test_self_contained_no_pronouns_skipped():
assert _needs("What is the capital of France?") is False
assert _needs("Summarize the Q3 earnings report") is False
assert _needs("Wie ist das Wetter in Berlin morgen") is False
def test_english_pronouns_trigger():
assert _needs("What does it mean?") is True
assert _needs("Can you fix that?") is True
assert _needs("Tell me more about them") is True
def test_german_pronouns_trigger():
assert _needs("Was bedeutet das?") is True
assert _needs("Kannst du es noch einmal erklären") is True
assert _needs("Wer sind sie?") is True
def test_french_pronouns_trigger():
assert _needs("Qu'est-ce que ça veut dire?") is True
assert _needs("Parle-moi de lui") is True
def test_empty_or_whitespace():
assert _needs("") is False
assert _needs(" ") is False