diff --git a/bot.py b/bot.py index 135f3fc..29a4a0e 100644 --- a/bot.py +++ b/bot.py @@ -463,6 +463,8 @@ class Bot: bot_key=bot_key, publish_key_cb=lambda key, rid=room_id: asyncio.ensure_future( self._publish_encryption_key(rid, key)), + memory=self.memory, + caller_user_id=event.sender, ) # Check timeline for caller's key diff --git a/voice.py b/voice.py index e5ea5a5..e4ebb97 100644 --- a/voice.py +++ b/voice.py @@ -10,10 +10,15 @@ import os import zoneinfo +import json +import re + import aiohttp +import httpx from livekit import rtc, api as lkapi -from livekit.agents import Agent, AgentSession, room_io +from livekit.agents import Agent, AgentSession, function_tool, room_io from livekit.plugins import openai as lk_openai, elevenlabs, silero +from openai import AsyncOpenAI logger = logging.getLogger("matrix-ai-voice") @@ -23,6 +28,8 @@ LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "") LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "") ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "") +BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "") +MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service:8090") DEFAULT_VOICE_ID = "JBFqnCBsd6RMkjVDRZzb" # George - warm, British male, multilingual _VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf. @@ -112,6 +119,86 @@ def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]: return keys +async def _brave_search(query: str, count: int = 5) -> str: + """Call Brave Search API and return formatted results.""" + if not BRAVE_API_KEY: + return "Search unavailable (no API key configured)." + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get( + "https://api.search.brave.com/res/v1/web/search", + headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY}, + params={"q": query, "count": count, "text_decorations": False}, + ) + resp.raise_for_status() + data = resp.json() + results = data.get("web", {}).get("results", []) + if not results: + return "No results found." + lines = [] + for r in results[:count]: + lines.append(f"- {r.get('title', '')}: {r.get('description', '')} ({r.get('url', '')})") + return "\n".join(lines) + except Exception as exc: + logger.warning("Brave search error: %s", exc) + return f"Search failed: {exc}" + + +async def _extract_voice_memories(user_text: str, agent_text: str, + user_id: str, room_id: str) -> None: + """Extract memorable facts from a voice exchange and store them.""" + if not LITELLM_URL or not MEMORY_SERVICE_URL: + return + try: + # Fetch existing facts to avoid duplicates + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post( + f"{MEMORY_SERVICE_URL}/memories/query", + json={"user_id": user_id, "query": "all facts", "top_k": 20}, + ) + existing = [m["fact"] for m in resp.json().get("results", [])] if resp.is_success else [] + + existing_text = "\n".join(f"- {f}" for f in existing) if existing else "(none)" + llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) + resp = await llm.chat.completions.create( + model="claude-haiku", + messages=[ + {"role": "system", "content": ( + "Extract memorable facts about the user from this voice conversation snippet. " + "Return a JSON array of concise strings. Include: name, preferences, location, " + "occupation, interests, family, projects. Skip duplicate or temporary info. " + "Return [] if nothing new." + )}, + {"role": "user", "content": ( + f"Existing memories:\n{existing_text}\n\n" + f"User said: {user_text}\nAssistant replied: {agent_text}\n\n" + "New facts (JSON array):" + )}, + ], + max_tokens=200, + ) + raw = resp.choices[0].message.content.strip() + if raw.startswith("```"): + raw = re.sub(r"^```\w*\n?", "", raw) + raw = re.sub(r"\n?```$", "", raw) + match = re.search(r"\[.*\]", raw, re.DOTALL) + if match: + raw = match.group(0) + new_facts = json.loads(raw) + if not isinstance(new_facts, list): + return + async with httpx.AsyncClient(timeout=10.0) as client: + for fact in new_facts: + if isinstance(fact, str) and fact.strip(): + await client.post( + f"{MEMORY_SERVICE_URL}/memories/store", + json={"user_id": user_id, "fact": fact.strip(), "source_room": room_id}, + ) + logger.info("Memory stored for %s: %s", user_id, fact[:80]) + except Exception as exc: + logger.warning("Voice memory extraction failed: %s", exc) + + def _build_e2ee_options() -> rtc.E2EEOptions: """Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1). @@ -133,7 +220,8 @@ def _build_e2ee_options() -> rtc.E2EEOptions: class VoiceSession: def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet", - publish_key_cb=None, bot_key: bytes | None = None): + publish_key_cb=None, bot_key: bytes | None = None, + memory=None, caller_user_id: str | None = None): self.nio_client = nio_client self.room_id = room_id self.device_id = device_id @@ -149,6 +237,8 @@ class VoiceSession: self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index self._bot_key: bytes = bot_key or os.urandom(16) self._publish_key_cb = publish_key_cb + self._memory = memory # MemoryClient instance from bot.py + self._caller_user_id = caller_user_id # Matrix user ID for memory lookup def on_encryption_key(self, sender, device_id, key, index): """Receive E2EE key from Element Call participant.""" @@ -451,6 +541,18 @@ class VoiceSession: if remote_identity: logger.info("Linking to remote participant: %s", remote_identity) + # Load memories for this caller + memory_section = "" + if self._memory and self._caller_user_id: + try: + mems = await self._memory.query(self._caller_user_id, "voice call", top_k=10) + if mems: + memory_section = "\n\nKontext aus früheren Gesprächen mit diesem Nutzer:\n" + \ + "\n".join(f"- {m['fact']}" for m in mems) + logger.info("Loaded %d memories for %s", len(mems), self._caller_user_id) + except Exception as exc: + logger.warning("Memory query failed: %s", exc) + # Voice pipeline — George (British male, multilingual DE/EN) self._http_session = aiohttp.ClientSession() self._stt_session = aiohttp.ClientSession() # separate session avoids WS/HTTP conflicts @@ -468,9 +570,13 @@ class VoiceSession: def _on_user_state(ev): logger.info("VAD: user_state=%s", ev.new_state) + _last_user_speech: list[str] = [] + @self.session.on("user_input_transcribed") def _on_user_speech(ev): logger.info("USER_SPEECH: %s", ev.transcript) + if ev.transcript: + _last_user_speech.append(ev.transcript) @self.session.on("conversation_item_added") def _on_conversation_item(ev): @@ -478,8 +584,30 @@ class VoiceSession: text = getattr(ev.item, "text_content", "") or "" if role == "assistant" and text: logger.info("AGENT_SPEECH: %s", text) + if self._memory and self._caller_user_id and _last_user_speech: + user_text = " ".join(_last_user_speech) + _last_user_speech.clear() + asyncio.ensure_future( + _extract_voice_memories(user_text, text, + self._caller_user_id, self.room_id)) - agent = Agent(instructions=_build_voice_prompt()) + # Brave Search tool — lets the agent answer questions about current events + @function_tool + async def search_web(query: str) -> str: + """Search the web for current information using Brave Search. + + Use this when asked about recent news, current events, prices, + weather, or any information that may have changed recently. + """ + logger.info("SEARCH: %s", query) + result = await _brave_search(query) + logger.info("SEARCH_RESULT: %s", result[:200]) + return result + + agent = Agent( + instructions=_build_voice_prompt() + memory_section, + tools=[search_web], + ) io_opts = room_io.RoomOptions( participant_identity=remote_identity, close_on_disconnect=False,