feat(voice): add cross-call memory and Brave Search tool

- Query user memories at call start and inject into agent system prompt
- Extract new facts after each exchange using claude-haiku via LiteLLM
- Add Brave Search tool (@function_tool) for current data queries
- Pass memory client and caller_user_id through VoiceSession constructor
- Pre-compute 8 HMAC-ratcheted EC keys for reliable E2EE decryption

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-02-22 15:27:59 +02:00
parent 2b8744de6e
commit 52f8cb569c
2 changed files with 133 additions and 3 deletions

2
bot.py
View File

@@ -463,6 +463,8 @@ class Bot:
bot_key=bot_key,
publish_key_cb=lambda key, rid=room_id: asyncio.ensure_future(
self._publish_encryption_key(rid, key)),
memory=self.memory,
caller_user_id=event.sender,
)
# Check timeline for caller's key

134
voice.py
View File

@@ -10,10 +10,15 @@ import os
import zoneinfo
import json
import re
import aiohttp
import httpx
from livekit import rtc, api as lkapi
from livekit.agents import Agent, AgentSession, room_io
from livekit.agents import Agent, AgentSession, function_tool, room_io
from livekit.plugins import openai as lk_openai, elevenlabs, silero
from openai import AsyncOpenAI
logger = logging.getLogger("matrix-ai-voice")
@@ -23,6 +28,8 @@ LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service:8090")
DEFAULT_VOICE_ID = "JBFqnCBsd6RMkjVDRZzb" # George - warm, British male, multilingual
_VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
@@ -112,6 +119,86 @@ def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]:
return keys
async def _brave_search(query: str, count: int = 5) -> str:
"""Call Brave Search API and return formatted results."""
if not BRAVE_API_KEY:
return "Search unavailable (no API key configured)."
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://api.search.brave.com/res/v1/web/search",
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
params={"q": query, "count": count, "text_decorations": False},
)
resp.raise_for_status()
data = resp.json()
results = data.get("web", {}).get("results", [])
if not results:
return "No results found."
lines = []
for r in results[:count]:
lines.append(f"- {r.get('title', '')}: {r.get('description', '')} ({r.get('url', '')})")
return "\n".join(lines)
except Exception as exc:
logger.warning("Brave search error: %s", exc)
return f"Search failed: {exc}"
async def _extract_voice_memories(user_text: str, agent_text: str,
user_id: str, room_id: str) -> None:
"""Extract memorable facts from a voice exchange and store them."""
if not LITELLM_URL or not MEMORY_SERVICE_URL:
return
try:
# Fetch existing facts to avoid duplicates
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{MEMORY_SERVICE_URL}/memories/query",
json={"user_id": user_id, "query": "all facts", "top_k": 20},
)
existing = [m["fact"] for m in resp.json().get("results", [])] if resp.is_success else []
existing_text = "\n".join(f"- {f}" for f in existing) if existing else "(none)"
llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY)
resp = await llm.chat.completions.create(
model="claude-haiku",
messages=[
{"role": "system", "content": (
"Extract memorable facts about the user from this voice conversation snippet. "
"Return a JSON array of concise strings. Include: name, preferences, location, "
"occupation, interests, family, projects. Skip duplicate or temporary info. "
"Return [] if nothing new."
)},
{"role": "user", "content": (
f"Existing memories:\n{existing_text}\n\n"
f"User said: {user_text}\nAssistant replied: {agent_text}\n\n"
"New facts (JSON array):"
)},
],
max_tokens=200,
)
raw = resp.choices[0].message.content.strip()
if raw.startswith("```"):
raw = re.sub(r"^```\w*\n?", "", raw)
raw = re.sub(r"\n?```$", "", raw)
match = re.search(r"\[.*\]", raw, re.DOTALL)
if match:
raw = match.group(0)
new_facts = json.loads(raw)
if not isinstance(new_facts, list):
return
async with httpx.AsyncClient(timeout=10.0) as client:
for fact in new_facts:
if isinstance(fact, str) and fact.strip():
await client.post(
f"{MEMORY_SERVICE_URL}/memories/store",
json={"user_id": user_id, "fact": fact.strip(), "source_room": room_id},
)
logger.info("Memory stored for %s: %s", user_id, fact[:80])
except Exception as exc:
logger.warning("Voice memory extraction failed: %s", exc)
def _build_e2ee_options() -> rtc.E2EEOptions:
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
@@ -133,7 +220,8 @@ def _build_e2ee_options() -> rtc.E2EEOptions:
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
publish_key_cb=None, bot_key: bytes | None = None):
publish_key_cb=None, bot_key: bytes | None = None,
memory=None, caller_user_id: str | None = None):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
@@ -149,6 +237,8 @@ class VoiceSession:
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
self._bot_key: bytes = bot_key or os.urandom(16)
self._publish_key_cb = publish_key_cb
self._memory = memory # MemoryClient instance from bot.py
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant."""
@@ -451,6 +541,18 @@ class VoiceSession:
if remote_identity:
logger.info("Linking to remote participant: %s", remote_identity)
# Load memories for this caller
memory_section = ""
if self._memory and self._caller_user_id:
try:
mems = await self._memory.query(self._caller_user_id, "voice call", top_k=10)
if mems:
memory_section = "\n\nKontext aus früheren Gesprächen mit diesem Nutzer:\n" + \
"\n".join(f"- {m['fact']}" for m in mems)
logger.info("Loaded %d memories for %s", len(mems), self._caller_user_id)
except Exception as exc:
logger.warning("Memory query failed: %s", exc)
# Voice pipeline — George (British male, multilingual DE/EN)
self._http_session = aiohttp.ClientSession()
self._stt_session = aiohttp.ClientSession() # separate session avoids WS/HTTP conflicts
@@ -468,9 +570,13 @@ class VoiceSession:
def _on_user_state(ev):
logger.info("VAD: user_state=%s", ev.new_state)
_last_user_speech: list[str] = []
@self.session.on("user_input_transcribed")
def _on_user_speech(ev):
logger.info("USER_SPEECH: %s", ev.transcript)
if ev.transcript:
_last_user_speech.append(ev.transcript)
@self.session.on("conversation_item_added")
def _on_conversation_item(ev):
@@ -478,8 +584,30 @@ class VoiceSession:
text = getattr(ev.item, "text_content", "") or ""
if role == "assistant" and text:
logger.info("AGENT_SPEECH: %s", text)
if self._memory and self._caller_user_id and _last_user_speech:
user_text = " ".join(_last_user_speech)
_last_user_speech.clear()
asyncio.ensure_future(
_extract_voice_memories(user_text, text,
self._caller_user_id, self.room_id))
agent = Agent(instructions=_build_voice_prompt())
# Brave Search tool — lets the agent answer questions about current events
@function_tool
async def search_web(query: str) -> str:
"""Search the web for current information using Brave Search.
Use this when asked about recent news, current events, prices,
weather, or any information that may have changed recently.
"""
logger.info("SEARCH: %s", query)
result = await _brave_search(query)
logger.info("SEARCH_RESULT: %s", result[:200])
return result
agent = Agent(
instructions=_build_voice_prompt() + memory_section,
tools=[search_web],
)
io_opts = room_io.RoomOptions(
participant_identity=remote_identity,
close_on_disconnect=False,