Files
matrix-ai-agent/voice.py
Christian Gick c11dd73ce3 fix: handle Element X MSC4143 v2 encryption key format (memberships array)
Element X embeds E2EE keys inside memberships[].encryption_keys,
not at the top level of the call.member state event content.
Bot was only checking content.encryption_keys, so it never found
the caller's key — causing 'Warten auf Medien' (waiting for media)
because encrypted audio couldn't be decrypted.

- Added _extract_enc_keys_from_content() helper handling both formats
- Updated on_unknown handler, VoiceSession creation, and key fetch
- Bot now publishes keys in both formats for compatibility
- Updated voice.py state fetch to check memberships[] fallback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 08:57:24 +02:00

1508 lines
79 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Voice session: LiveKit + STT/LLM/TTS pipeline.
E2EE via HKDF key derivation (Element Call compatible).
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
import asyncio
import base64
import datetime
import hashlib
import logging
import os
import time
import zoneinfo
import json
import re
import sentry_sdk
import aiohttp
import httpx
from livekit import rtc, api as lkapi
from livekit.agents import Agent, AgentSession, StopResponse, function_tool, room_io, llm
from activity_video import ActivityVideoPublisher
from livekit.plugins import openai as lk_openai, elevenlabs, silero
from openai import AsyncOpenAI
logger = logging.getLogger("matrix-ai-voice")
# Sentry error tracking
_sentry_dsn = os.environ.get("SENTRY_DSN", "")
if _sentry_dsn:
sentry_sdk.init(dsn=_sentry_dsn, traces_sample_rate=0.1, environment=os.environ.get("SENTRY_ENV", "production"))
logger.info("Sentry initialized for voice")
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service:8090")
CONFLUENCE_URL = os.environ.get("CONFLUENCE_BASE_URL", "")
CONFLUENCE_USER = os.environ.get("CONFLUENCE_USER", "")
CONFLUENCE_TOKEN = os.environ.get("CONFLUENCE_TOKEN", "")
DEFAULT_VOICE_ID = "ML23UVoFL5mI6APbRAeR" # Robert Ranger - Cool Storyteller, native German
_VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
Aktuelle Zeit: {datetime}
Zeitzone des Nutzers: {timezone}
LLM-Modell: {model}
STRIKTE Regeln:
- Antworte in der Sprache des Nutzers (Deutsch oder Englisch)
- Halte Antworten normalerweise auf 1-2 kurze Saetze. AUSNAHME: Wenn ein Dokument geladen ist und der Nutzer Rollenspiele, Probegespraeche, Mock-Interviews oder ausfuehrliche Besprechungen wuenscht, antworte so ausfuehrlich wie noetig
- Sei direkt und praezise, keine Fuellwoerter
- Erfinde NICHTS ausser der Nutzer bittet explizit um Rollenspiel, Probegespraech oder Simulation basierend auf dem Dokumentinhalt. In dem Fall spiele die Rolle ueberzeugend und nutze den Dokumentinhalt als Grundlage
- Beantworte nur was gefragt wird
- Wenn niemand etwas fragt oder du dir nicht sicher bist ob jemand mit dir spricht, SCHWEIGE. Antworte NUR auf klare, direkte Fragen oder Anweisungen. Kein Smalltalk, kein "Danke", kein "Wie kann ich helfen" von dir aus
- Fokussiere dich IMMER auf die letzte Nachricht des Nutzers — egal ob Text oder Sprache. Fuehre nicht automatisch fruehere Gespraeche fort und fasse sie nicht zusammen
- Wenn ein Nutzer dich gruesst oder ein neues Gespraech nach einer Pause beginnt, antworte mit einer kurzen Begruessung und warte auf Anweisungen
- Schreibe Zahlen und Jahreszahlen IMMER als Woerter aus (z.B. "zweitausendundzwanzig" statt "2026", "zweiundzwanzigsten Februar" statt "22. Februar")
- Bei zeitrelevanten Fragen (Uhrzeit, Termine, Geschaeftszeiten): frage kurz nach ob der Nutzer noch in seiner gespeicherten Zeitzone ist, bevor du antwortest. Nutze set_user_timezone wenn sich der Standort geaendert hat.
- Wenn der Nutzer seinen Standort oder seine Stadt erwaehnt, nutze set_user_timezone um die Zeitzone zu speichern.
- IGNORIERE alle Texte in Sternchen wie *Störgeräusche*, *Schlechte Qualität*, *Fernsehgeräusche*, *Schrei* usw. — das sind KEINE echten Nutzereingaben sondern technische Annotationen. Antworte NIEMALS darauf und tue so als haette niemand etwas gesagt.
- Du kannst Webseiten oeffnen und lesen mit browse_url. Wenn der Nutzer einen Link teilt oder du nach einer Websuche mehr Details brauchst, nutze browse_url um die Seite zu lesen und zusammenzufassen.
- Du kannst Confluence-Seiten suchen, lesen, bearbeiten und erstellen. Nutze recent_confluence_pages um die zuletzt bearbeiteten Seiten anzuzeigen (bevorzugt BEVOR du suchst), search_confluence um gezielt zu suchen, read_confluence_page zum Lesen, update_confluence_page zum Bearbeiten und create_confluence_page zum Erstellen neuer Seiten.
- Du kannst den Bildschirm oder die Kamera des Nutzers sehen wenn er sie teilt. Nutze look_at_screen wenn der Nutzer etwas zeigen moechte oder fragt ob du etwas sehen kannst."""
def _build_voice_prompt(model: str = "claude-sonnet",
timezone: str | None = None) -> str:
tz_name = timezone or os.environ.get("VOICE_TIMEZONE", "Europe/Berlin")
try:
tz = zoneinfo.ZoneInfo(tz_name)
except Exception:
tz = datetime.timezone.utc
now = datetime.datetime.now(tz)
return _VOICE_PROMPT_TEMPLATE.format(
datetime=now.strftime("%A, %d. %B %Y %H:%M %Z"),
timezone=tz_name,
model=model,
)
# ElevenLabs scribe_v2_realtime produces two kinds of artifacts:
# 1. Noise annotations: *Störgeräusche*, *Schlechte Qualität*, etc.
# 2. Subtitle/metadata leaks: "Untertitel: ARD Text im Auftrag von Funk (2017)"
# Filter both via on_user_turn_completed (downstream of VAD+STT, no pipeline impact).
_NOISE_ANNOTATION_RE = re.compile(r'^\s*\*[^*]+\*\s*$')
_STT_ARTIFACT_PATTERNS = [
re.compile(r'(?i)^untertitel\b'), # subtitle metadata
re.compile(r'(?i)^copyright\b'), # copyright notices
re.compile(r'(?i)^musik\s*$'), # bare "Musik" annotation
re.compile(r'(?i)^\(.*\)\s*$'), # parenthetical annotations like (Applaus)
re.compile(r'(?i)^\[.*\]\s*$'), # bracketed annotations like [Musik]
re.compile(r'(?i)^(background noise|static|silence|applause|laughter|noise)\s*$'), # English variants
re.compile(r'^\s*\*\*[^*]+\*\*\s*$'), # double-asterisk: **Noise**
]
_STT_GHOST_PHRASES = {
"danke", "ja", "nein", "okay", "ok", "tschüss", "hallo", "bye",
"mhm", "hmm", "aha", "oh", "ah", "ähm", "hm", "tschüss",
"thanks", "thank you", "yes", "no", "bye bye", "hello", "hi",
"bitte", "genau", "gut", "so", "na", "ne", "ach", "doch",
"vielen dank", "alles klar", "schön", "super", "richtig",
}
def _is_stt_artifact(text: str) -> bool:
"""Check if text is an STT artifact (noise annotation, metadata leak, or ghost phrase)."""
if _NOISE_ANNOTATION_RE.match(text):
return True
if any(p.match(text) for p in _STT_ARTIFACT_PATTERNS):
return True
# Short phantom transcriptions from ambient noise — ElevenLabs hallucinates
# common German/English filler words when it hears background sounds
words = text.split()
if len(words) <= 2 and text.lower().strip().rstrip(".!?,") in _STT_GHOST_PHRASES:
return True
return False
_MAX_CHAT_CTX_ITEMS = 40 # Keep last N items in LLM context to prevent unbounded growth
class _NoiseFilterAgent(Agent):
"""Agent that suppresses ElevenLabs STT artifacts before LLM sees them.
Uses on_user_turn_completed() which runs after VAD+STT, so no backpressure
risk to the audio pipeline. Raises StopResponse to silently discard noise.
Also truncates chat context to prevent unbounded growth in long sessions.
"""
async def on_user_turn_completed(
self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
) -> None:
text = (new_message.text_content or "").strip()
if text and _is_stt_artifact(text):
logger.info("STT artifact suppressed: %s", text)
# Remove the artifact from context so it doesn't accumulate
if turn_ctx.items and turn_ctx.items[-1] is new_message:
turn_ctx.items.pop()
raise StopResponse()
# Truncate context: keep system/instructions + last N items
if len(turn_ctx.items) > _MAX_CHAT_CTX_ITEMS + 5:
# Preserve first item (system prompt) and trim middle
keep_start = 1 # system prompt
keep_end = _MAX_CHAT_CTX_ITEMS
old_len = len(turn_ctx.items)
turn_ctx.items[:] = turn_ctx.items[:keep_start] + turn_ctx.items[-keep_end:]
logger.info("CONTEXT_TRUNCATED: %d -> %d items", old_len, len(turn_ctx.items))
_vad = None
def _get_vad():
global _vad
if _vad is None:
_vad = silero.VAD.load(
activation_threshold=0.75,
min_speech_duration=0.6,
min_silence_duration=0.65,
)
return _vad
def _make_lk_identity(user_id, device_id):
return f"{user_id}:{device_id}"
def _compute_lk_room_name(room_id):
raw = f"{room_id}|m.call#ROOM"
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
def _generate_lk_jwt(room_id, user_id, device_id):
identity = _make_lk_identity(user_id, device_id)
lk_room = _compute_lk_room_name(room_id)
token = (
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
.with_identity(identity)
.with_name(user_id)
.with_grants(lkapi.VideoGrants(
room_join=True, room=lk_room,
can_publish=True, can_subscribe=True))
.with_ttl(datetime.timedelta(hours=24))
)
logger.info("JWT: identity=%s room=%s", identity, lk_room)
return token.to_jwt()
KDF_HKDF = 1 # Rust FFI applies HKDF internally (proto enum value 1)
# NOTE: value 0 in the proto is PBKDF2, NOT raw/none mode — see e2ee_patch.py
def _hkdf_derive(ikm: bytes) -> bytes:
"""Pre-derive AES-128 key via HKDF-SHA256, matching Element Call JS deriveEncryptionKey().
JS params: hash=SHA-256, salt=encode("LKFrameEncryptionKey"), info=ArrayBuffer(128), length=128bit
Result is passed to set_key() with KDF_RAW so Rust uses it as-is (no double-derivation).
"""
import hmac
salt = b"LKFrameEncryptionKey"
info = b"\x00" * 128
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
t1 = hmac.new(prk, info + b"\x01", hashlib.sha256).digest()
return t1[:16]
def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]:
"""Pre-compute ratcheted keys 0..count-1 matching livekit-client-sdk-js ratchet().
EC JS ratchet: new_raw = HMAC(key=current_raw, data="LKFrameEncryptionKey")[:16]
Returns {index: raw_key} for all indices 0..count-1.
Each raw key is then HKDF-derived before passing to set_key() with KDF_RAW.
"""
import hmac as _hmac
keys = {}
raw = base_raw
for i in range(count):
keys[i] = raw
raw = _hmac.new(raw, b"LKFrameEncryptionKey", hashlib.sha256).digest()[:16]
return keys
def _derive_and_set_key(kp, identity: str, raw_key: bytes, index: int) -> None:
"""Set raw base key via KeyProvider — Rust HKDF derives AES key internally.
Wraps set_key() with diagnostic logging for MAT-144 investigation.
"""
ok = kp.set_key(identity, raw_key, index)
logger.debug("set_key[%d] %s: raw=%s (%d bytes, ok=%s)",
index, identity, raw_key.hex()[:8], len(raw_key), ok)
async def _brave_search(query: str, count: int = 5) -> str:
"""Call Brave Search API and return formatted results."""
if not BRAVE_API_KEY:
return "Search unavailable (no API key configured)."
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://api.search.brave.com/res/v1/web/search",
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
params={"q": query, "count": count, "text_decorations": False},
)
resp.raise_for_status()
data = resp.json()
results = data.get("web", {}).get("results", [])
if not results:
return "No results found."
lines = []
for r in results[:count]:
lines.append(f"- {r.get('title', '')}: {r.get('description', '')} ({r.get('url', '')})")
return "\n".join(lines)
except Exception as exc:
logger.warning("Brave search error: %s", exc)
return f"Search failed: {exc}"
async def _fetch_webpage(url: str, max_chars: int = 8000) -> str:
"""Fetch a URL and extract clean text content using BeautifulSoup."""
try:
from bs4 import BeautifulSoup
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; AgilitonBot/1.0)"}) as client:
resp = await client.get(url)
resp.raise_for_status()
ct = resp.headers.get("content-type", "")
if "html" not in ct and "text" not in ct:
return f"URL returned non-text content ({ct})."
soup = BeautifulSoup(resp.text, "lxml")
for tag in soup(["script", "style", "nav", "footer", "header", "aside", "iframe"]):
tag.decompose()
# Prefer article/main content
main = soup.find("article") or soup.find("main") or soup.find("body")
text = main.get_text(separator="\n", strip=True) if main else soup.get_text(separator="\n", strip=True)
# Collapse multiple blank lines
text = re.sub(r'\n{3,}', '\n\n', text)
if len(text) > max_chars:
text = text[:max_chars] + "\n\n[... truncated]"
return text if text.strip() else "Page loaded but no readable text content found."
except httpx.HTTPStatusError as exc:
return f"HTTP error {exc.response.status_code} fetching {url}"
except Exception as exc:
logger.warning("Webpage fetch error for %s: %s", url, exc)
return f"Failed to fetch page: {exc}"
async def _store_user_pref(user_id: str, key: str, value: str) -> None:
"""Store a user preference in memory (e.g. timezone, language)."""
if not MEMORY_SERVICE_URL:
return
fact = f"[PREF:{key}] {value}"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
f"{MEMORY_SERVICE_URL}/memories/store",
json={"user_id": user_id, "fact": fact, "source_room": "preferences"},
)
logger.info("Preference stored for %s: %s=%s", user_id, key, value)
except Exception as exc:
logger.warning("Preference store failed: %s", exc)
async def _store_voice_exchange(user_text: str, agent_text: str,
user_id: str, room_id: str) -> None:
"""Store the full conversation exchange as memory (no LLM extraction)."""
if not MEMORY_SERVICE_URL:
return
exchange = f"User: {user_text}\nAssistant: {agent_text}"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
f"{MEMORY_SERVICE_URL}/memories/store",
json={"user_id": user_id, "fact": exchange, "source_room": room_id},
)
logger.info("Memory stored for %s: %s", user_id, exchange[:120])
except Exception as exc:
logger.warning("Voice memory store failed: %s", exc)
async def _confluence_search(query: str, limit: int = 5) -> list[dict]:
"""Search Confluence pages by CQL query. Returns list of {id, title, space, url}."""
if not CONFLUENCE_URL or not CONFLUENCE_USER or not CONFLUENCE_TOKEN:
raise RuntimeError("Confluence credentials not configured")
cql = f'type=page AND (title~"{query}" OR text~"{query}")'
url = f"{CONFLUENCE_URL}/rest/api/content/search"
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
url,
params={"cql": cql, "limit": limit},
auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN),
)
resp.raise_for_status()
data = resp.json()
results = []
for r in data.get("results", []):
results.append({
"id": r["id"],
"title": r.get("title", ""),
"space": r.get("space", {}).get("name", "") if "space" in r else "",
"url": f"{CONFLUENCE_URL}{r.get('_links', {}).get('webui', '')}",
})
return results
async def _confluence_read_page(page_id: str) -> tuple[str, str, int]:
"""Read a Confluence page and return (title, plain_text, version_number)."""
if not CONFLUENCE_URL or not CONFLUENCE_USER or not CONFLUENCE_TOKEN:
raise RuntimeError("Confluence credentials not configured")
url = f"{CONFLUENCE_URL}/rest/api/content/{page_id}"
params = {"expand": "body.storage,version,title"}
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
url,
params=params,
auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN),
)
resp.raise_for_status()
data = resp.json()
title = data.get("title", "")
version = data.get("version", {}).get("number", 1)
html = data.get("body", {}).get("storage", {}).get("value", "")
# Strip HTML tags to plain text for voice readback
plain = re.sub(r"<[^>]+>", " ", html)
plain = re.sub(r"\s+", " ", plain).strip()
return title, plain, version
async def _confluence_update_section(page_id: str, section_heading: str, new_html: str) -> str:
"""Update a section of a Confluence page by heading.
Uses confluence_collab library for BS4 parsing and 409 conflict retry.
"""
if not CONFLUENCE_URL or not CONFLUENCE_USER or not CONFLUENCE_TOKEN:
return "Confluence credentials not configured."
from confluence_collab.client import Auth
from confluence_collab.editor import section_update
auth = Auth(base_url=CONFLUENCE_URL, username=CONFLUENCE_USER, api_token=CONFLUENCE_TOKEN)
result = await section_update(page_id, section_heading, new_html, auth)
if result.ok:
msg = f"Section '{section_heading}' updated successfully."
if result.retries > 0:
msg += f" ({result.retries} conflict retries)"
return msg
return result.message
async def _confluence_create_page(space_key: str, title: str, body_html: str,
parent_id: str | None = None) -> dict:
"""Create a new Confluence page via REST API. Returns {id, title, url}."""
if not CONFLUENCE_URL or not CONFLUENCE_USER or not CONFLUENCE_TOKEN:
raise RuntimeError("Confluence credentials not configured")
payload: dict = {
"type": "page",
"title": title,
"space": {"key": space_key},
"body": {"storage": {"value": body_html, "representation": "storage"}},
}
if parent_id:
payload["ancestors"] = [{"id": parent_id}]
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(
f"{CONFLUENCE_URL}/rest/api/content",
json=payload,
auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN),
)
resp.raise_for_status()
data = resp.json()
return {
"id": data["id"],
"title": data["title"],
"url": f"{CONFLUENCE_URL}/pages/viewpage.action?pageId={data['id']}",
}
async def _confluence_recent_pages(limit: int = 5) -> list[dict]:
"""Fetch recently modified Confluence pages. Returns list of {id, title, space, url, modified}."""
if not CONFLUENCE_URL or not CONFLUENCE_USER or not CONFLUENCE_TOKEN:
raise RuntimeError("Confluence credentials not configured")
cql = "type=page ORDER BY lastmodified DESC"
url = f"{CONFLUENCE_URL}/rest/api/content/search"
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
url,
params={"cql": cql, "limit": limit},
auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN),
)
resp.raise_for_status()
data = resp.json()
results = []
for r in data.get("results", []):
results.append({
"id": r["id"],
"title": r.get("title", ""),
"space": r.get("space", {}).get("name", "") if "space" in r else "",
"url": f"{CONFLUENCE_URL}{r.get('_links', {}).get('webui', '')}",
})
return results
def _build_e2ee_options() -> rtc.E2EEOptions:
"""Build E2EE options — Rust FFI applies HKDF internally (KDF_HKDF=1).
Pass raw base keys from Matrix key exchange events directly to set_key().
The Rust FFI derives the AES frame key via HKDF(base_key, ratchetSalt, ...) internally.
Element Call uses: ratchetWindowSize=10, keyringSize=256, ratchetSalt="LKFrameEncryptionKey"
NOTE: proto value 0 = PBKDF2 (NOT raw) — must use KDF_HKDF=1.
"""
key_opts = rtc.KeyProviderOptions(
shared_key=b"", # empty = per-participant mode
ratchet_window_size=10,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=10,
key_ring_size=256,
key_derivation_function=KDF_HKDF, # Rust FFI applies HKDF; we pass raw base keys
)
return rtc.E2EEOptions(
encryption_type=rtc.EncryptionType.GCM,
key_provider_options=key_opts,
)
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
publish_key_cb=None, bot_key: bytes | None = None,
memory=None, caller_user_id: str | None = None,
document_context: str | None = None):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
self.lk_url = lk_url
self.model = model
self.lk_room = None
self.session = None
self._task = None
self._activity_video = None
self._activity_task = None
self._http_session = None
self._caller_key: bytes | None = None
self._caller_identity: str | None = None
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
self._bot_key: bytes = bot_key or os.urandom(16)
self._publish_key_cb = publish_key_cb
self._memory = memory # MemoryClient instance from bot.py
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
self._document_context = document_context # PDF text from room for voice context
self._transcript: list[dict] = [] # {"role": "user"|"assistant", "text": "..."}
self._video_track: rtc.Track | None = None # remote video track for on-demand vision
self._key_poller_task: asyncio.Task | None = None # continuous background key poller (MAT-164)
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant.
Store-only: keys are applied in on_track_subscribed() / on_e2ee_state_changed()
where the frame cryptor is guaranteed to exist. If the track is already
subscribed (late key arrival / rotation), set the key immediately.
Element Call may rotate keys by re-sending index 0 with a new value
(e.g. when screen share starts). The frame cryptor caches derived AES
keys per index, so we use an internal incrementing index to force
re-derivation.
"""
if not key:
return
if not self._caller_key:
self._caller_key = key
self._caller_identity = f"{sender}:{device_id}"
# Detect same-index key rotation: if index 0 changes value, use next internal index
old_key = self._caller_all_keys.get(index)
if old_key and old_key != key:
# Key rotated at same Element Call index — use next available internal index
internal_idx = max(self._caller_all_keys.keys()) + 1
logger.info("Key rotation detected: index %d changed value, mapping to internal index %d",
index, internal_idx)
self._caller_all_keys[internal_idx] = key
self._caller_key = key
effective_index = internal_idx
else:
self._caller_all_keys[index] = key
effective_index = index
logger.info("E2EE key received from %s:%s (index=%d, effective=%d, %d bytes, raw=%s)",
sender, device_id, index, effective_index, len(key), key.hex())
# Late key arrival: if track already subscribed, frame cryptor exists — set key now.
if self.lk_room and self._caller_identity:
caller_lk_id = self._caller_identity
for p in self.lk_room.remote_participants.values():
if p.identity == caller_lk_id:
has_subscribed = any(
pub.subscribed for pub in p.track_publications.values()
)
if has_subscribed:
try:
kp = self.lk_room.e2ee_manager.key_provider
# Set ALL keys (old + new) so cryptor can try both
for idx, k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp, p.identity, k, idx)
logger.info("Late key rotation: set %d keys for %s",
len(self._caller_all_keys), p.identity)
except Exception as e:
logger.warning("Late key set_key failed: %s", e)
break
async def _fetch_encryption_key_http(self) -> bytes | None:
"""Fetch encryption key from call.member state (MSC4143) or timeline (legacy).
MSC4143: encryption_keys in org.matrix.msc3401.call.member state
Legacy: io.element.call.encryption_keys timeline events
"""
import httpx
homeserver = str(self.nio_client.homeserver)
token = self.nio_client.access_token
user_id = self.nio_client.user_id
# MSC4143: check call.member state events first
# Handles both top-level encryption_keys and memberships[].encryption_keys (Element X)
try:
state_url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/state"
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(state_url, headers={"Authorization": f"Bearer {token}"})
if resp.status_code == 200:
for evt in resp.json():
if evt.get("type") != "org.matrix.msc3401.call.member":
continue
sender = evt.get("sender", "")
if sender == user_id:
continue
content = evt.get("content", {})
# Extract keys from both formats
enc_keys = content.get("encryption_keys", [])
# MSC4143 v2 / Element X: keys inside memberships array
if not enc_keys:
for m in content.get("memberships", []):
m_keys = m.get("encryption_keys", [])
if m_keys:
enc_keys = m_keys
# Use device_id from membership entry
content = {**content, "device_id": m.get("device_id", content.get("device_id", ""))}
break
if enc_keys:
device = content.get("device_id", "")
import base64 as b64
for k in enc_keys:
key_b64 = k.get("key", "")
key_index = k.get("index", 0)
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key_bytes = b64.urlsafe_b64decode(key_b64)
if device:
self._caller_identity = f"{sender}:{device}"
self.on_encryption_key(sender, device, key_bytes, key_index)
max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else key_index
latest = self._caller_all_keys.get(max_idx, key_bytes)
logger.info("Got key from call.member state (sender=%s, device=%s, index=%d)",
sender, device, key_index)
return latest
except Exception as e:
logger.debug("call.member state key fetch failed: %s", e)
# Legacy: timeline scan
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
try:
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
url,
headers={"Authorization": f"Bearer {token}"},
params={"dir": "b", "limit": "50"},
)
resp.raise_for_status()
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
now_ms = int(time.time() * 1000)
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
sender = evt.get("sender", "")
if sender == user_id:
continue # skip our own key
content = evt.get("content", {})
device = content.get("device_id", "")
# Only accept keys from this call session (sent within last 60s)
sent_ts = content.get("sent_ts", evt.get("origin_server_ts", 0))
age_s = (now_ms - sent_ts) / 1000 if sent_ts else 999
logger.info("Found encryption_keys timeline event: sender=%s device=%s age=%.0fs",
sender, device, age_s)
if age_s > 300: # 5 min — covers key rotation during long calls
logger.info("Skipping stale encryption_keys event (%.0fs old)", age_s)
continue
all_keys = {}
import base64 as b64
for k in content.get("keys", []):
key_b64 = k.get("key", "")
key_index = k.get("index", 0)
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key_bytes = b64.urlsafe_b64decode(key_b64)
all_keys[key_index] = key_bytes
if all_keys:
if device:
self._caller_identity = f"{sender}:{device}"
# Route each key through on_encryption_key() so
# same-index rotation detection works correctly
# (MAT-164: was bypassing rotation via direct update).
for key_index, key_bytes in sorted(all_keys.items()):
self.on_encryption_key(sender, device, key_bytes, key_index)
max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else max(all_keys.keys())
latest_key = self._caller_all_keys.get(max_idx, all_keys[max(all_keys.keys())])
logger.info("Loaded caller keys at indices %s (using %d, key=%s)",
sorted(self._caller_all_keys.keys()), max_idx,
latest_key.hex()[:8])
return latest_key
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
logger.warning("HTTP encryption key fetch failed: %s", e)
return None
async def _continuous_key_poller(self):
"""Background poll for E2EE key rotations every 3s while connected (MAT-164).
Catches delayed key rotations that the proactive poll and sync path miss.
Compares key fingerprint and routes new keys through on_encryption_key().
"""
logger.info("Background key poller started")
try:
while self.lk_room:
await asyncio.sleep(3.0)
if not self.lk_room:
break
try:
prev_fingerprint = self._caller_key.hex() if self._caller_key else None
new_key = await self._fetch_encryption_key_http()
if new_key and (not self._caller_key or new_key != self._caller_key):
logger.info("Background poll: key rotated (%s -> %s)",
prev_fingerprint[:8] if prev_fingerprint else "none",
new_key.hex()[:8])
# Route through on_encryption_key for proper rotation handling
sender = self._caller_identity.split(":")[0] if self._caller_identity else ""
device = self._caller_identity.split(":")[-1] if self._caller_identity else ""
self.on_encryption_key(sender, device, new_key, 0)
except Exception as exc:
logger.debug("Background key poll error: %s", exc)
except asyncio.CancelledError:
pass
logger.info("Background key poller stopped")
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
for obj in [self.session, self.lk_room, self._http_session]:
if obj:
try:
if hasattr(obj, "aclose"):
await obj.aclose()
elif hasattr(obj, "disconnect"):
await obj.disconnect()
elif hasattr(obj, "close"):
await obj.close()
except Exception:
pass
if self._key_poller_task and not self._key_poller_task.done():
self._key_poller_task.cancel()
if self._activity_video:
self._activity_video.stop()
if self._activity_task and not self._activity_task.done():
self._activity_task.cancel()
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def get_transcript(self) -> list[dict]:
"""Return the call transcript as a list of {role, text} dicts."""
return list(self._transcript)
def get_document_context(self) -> str | None:
"""Return the document context loaded for this call, if any."""
return self._document_context
def get_confluence_page_id(self) -> str | None:
"""Return the active Confluence page ID, if any."""
if not self._document_context:
return None
ids = re.findall(r'confluence_page_id:(\d+)', self._document_context)
return ids[0] if ids else None
async def _run(self):
try:
user_id = self.nio_client.user_id
bot_identity = _make_lk_identity(user_id, self.device_id)
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Publish bot's own key immediately so Element Call can decrypt us
if self._publish_key_cb:
self._publish_key_cb(self._bot_key)
logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key))
# Check timeline for caller's encryption key
caller_key = await self._fetch_encryption_key_http()
if caller_key:
self._caller_key = caller_key
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
if not self._caller_key:
# Wait up to 15s for key via sync handler
logger.info("No key in timeline yet, waiting for sync...")
for _ in range(150):
if self._caller_key:
break
await asyncio.sleep(0.1)
# E2EE: configurable via E2EE_ENABLED env var (default true).
# When enabled, parameters MUST match Element Call JS SDK.
e2ee_enabled = os.environ.get("E2EE_ENABLED", "true").lower() in ("true", "1", "yes")
if e2ee_enabled:
e2ee_opts = _build_e2ee_options()
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
logger.info("E2EE enabled (HKDF mode)")
else:
e2ee_opts = None
room_opts = rtc.RoomOptions()
logger.warning("E2EE DISABLED — audio is unencrypted")
self.lk_room = rtc.Room()
@self.lk_room.on("participant_connected")
def on_p(p):
logger.info("Participant connected: %s", p.identity)
# Note: do NOT set keys here — frame cryptor not initialized yet at participant_connected.
# Keys are set in on_track_subscribed where the frame cryptor definitely exists.
@self.lk_room.on("track_published")
def on_tp(pub, p):
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_muted")
def on_mute(p, pub):
logger.info("Track MUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_unmuted")
def on_unmute(p, pub):
logger.info("Track UNMUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_subscribed")
def on_ts(t, pub, p):
enc_type = getattr(pub, 'encryption_type', 'N/A')
logger.info("Track sub: %s %s kind=%s enc_type=%s source=%s",
p.identity, pub.sid, t.kind, enc_type,
getattr(pub, 'source', 'N/A'))
# Store video track for on-demand vision (look_at_screen tool)
if int(t.kind) == 2: # video track (LiveKit: 1=audio, 2=video)
track_source = getattr(pub, 'source', None) or "unknown"
self._video_track = t
logger.info("Video track stored from %s source=%s for on-demand vision", p.identity, track_source)
# Screen share starts → Element Call rotates E2EE key.
# Proactively poll timeline for the new key instead of waiting
# for DEC_FAILED (MAT-164).
async def _proactive_key_poll(pid=p.identity):
pre_key = self._caller_key
for attempt in range(20): # 20 × 500ms = 10s (MAT-164)
await asyncio.sleep(0.5)
if self._caller_key != pre_key:
logger.info("Proactive poll: key rotated via sync (attempt %d)", attempt + 1)
return
new_key = await self._fetch_encryption_key_http()
if new_key and new_key != pre_key:
logger.info("Proactive poll: got new key from timeline (attempt %d, %s)",
attempt + 1, new_key.hex()[:8])
self.on_encryption_key(
self._caller_identity.split(":")[0] if self._caller_identity else "",
self._caller_identity.split(":")[-1] if self._caller_identity else "",
new_key, 0)
return
logger.info("Proactive poll: no key rotation after 10s")
asyncio.ensure_future(_proactive_key_poll())
if int(t.kind) in (1, 2) and e2ee_opts is not None: # audio + video tracks
caller_id = p.identity
track_type = "video" if int(t.kind) == 2 else "audio"
try:
kp_local = self.lk_room.e2ee_manager.key_provider
if self._caller_all_keys:
for idx, base_k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp_local, caller_id, base_k, idx)
logger.info("on_ts: set key[%d] for %s (%s track)",
idx, caller_id, track_type)
else:
logger.warning("on_ts: no caller keys yet — scheduling 0.5s retry")
async def _brief_key_retry(pid=caller_id):
await asyncio.sleep(0.5)
if self._caller_all_keys:
try:
kp_r = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp_r, pid, base_k, idx)
logger.info("on_ts_retry: set key[%d] for %s", idx, pid)
except Exception as exc:
logger.warning("on_ts_retry: set_key failed: %s", exc)
else:
logger.warning("on_ts_retry: still no caller keys for %s", pid)
asyncio.ensure_future(_brief_key_retry())
except Exception as exc:
logger.warning("on_ts: set_key failed: %s", exc)
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
_last_rekey_time = {} # per-participant cooldown for DEC_FAILED re-keying
_dec_failed_count = {} # consecutive DEC_FAILED per participant
_last_refetch_time = 0.0 # timestamp throttle replaces boolean gate (MAT-164)
@self.lk_room.on("e2ee_state_changed")
def on_e2ee_state(participant, state):
nonlocal _last_refetch_time
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
p_id = participant.identity if participant else "local"
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
if int(state) == 1: # OK — reset failure counter
_dec_failed_count.pop(p_id, None)
# When remote participant needs key: NEW, MISSING_KEY, or DEC_FAILED (with cooldown)
if participant and p_id != bot_identity and int(state) in (0, 3, 4):
now = time.monotonic()
if int(state) == 3:
_dec_failed_count[p_id] = _dec_failed_count.get(p_id, 0) + 1
# After 1+ DEC_FAILED: re-fetch key from timeline (key may have rotated)
# Timestamp throttle: allow re-fetch if >500ms since last (MAT-164)
if _dec_failed_count[p_id] >= 1 and (now - _last_refetch_time) > 0.5:
_last_refetch_time = now
_p_id_copy = p_id # capture for closure
async def _refetch_key():
try:
logger.info("DEC_FAILED x%d — re-fetching key from timeline",
_dec_failed_count.get(_p_id_copy, 0))
new_key = await self._fetch_encryption_key_http()
if new_key and new_key != self._caller_key:
logger.info("Got NEW key from timeline re-fetch (%s)",
new_key.hex()[:8])
self._caller_key = new_key
kp_r = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp_r, _p_id_copy, base_k, idx)
_dec_failed_count[_p_id_copy] = 0
elif new_key:
logger.info("Re-fetch returned same key — no rotation")
else:
logger.info("Re-fetch returned no fresh key")
except Exception as exc:
logger.warning("Key re-fetch failed: %s", exc)
asyncio.ensure_future(_refetch_key())
# Cooldown: only re-key every 0.5s for fast recovery (MAT-164)
last = _last_rekey_time.get(p_id, 0)
if (now - last) < 0.5:
return
_last_rekey_time[p_id] = now
if self._caller_all_keys:
try:
kp_e = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp_e, p_id, base_k, idx)
logger.info("e2ee_state: set key[%d] for %s on %s",
idx, p_id, state_name)
except Exception as exc:
logger.warning("e2ee_state set_key failed: %s", exc)
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=HKDF), remote=%d",
len(self.lk_room.remote_participants))
# Start continuous background key poller (MAT-164)
self._key_poller_task = asyncio.create_task(self._continuous_key_poller())
# Set bot's own key immediately after connect — local frame cryptor exists at connect time.
# Pre-derive via HKDF in Python since KDF_RAW is set (no Rust-side derivation).
kp = self.lk_room.e2ee_manager.key_provider
_derive_and_set_key(kp, bot_identity, self._bot_key, 0)
logger.info("Set bot key for %s (%d bytes)", bot_identity, len(self._bot_key))
# Element Call rotates its key when bot joins. Wait up to 3s for the
# rotated key to arrive via nio sync before proceeding. If it arrives,
# on_encryption_key() stores it and (if track already subscribed) sets it.
pre_key_count = len(self._caller_all_keys)
pre_max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
logger.info("Waiting up to 3s for key rotation (current: %d keys, max_idx=%d)...",
pre_key_count, pre_max_idx)
for _wait in range(6): # 6 × 0.5s = 3s
await asyncio.sleep(0.5)
new_count = len(self._caller_all_keys)
new_max = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
if new_max > pre_max_idx or new_count > pre_key_count:
self._caller_key = self._caller_all_keys[new_max]
logger.info("Key rotated: index %d%d, count %d%d",
pre_max_idx, new_max, pre_key_count, new_count)
break
else:
logger.info("No key rotation after 3s — proceeding with key[%d]", pre_max_idx)
# Find the remote participant, wait up to 10s if not yet connected
remote_identity = None
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if not remote_identity:
logger.info("No remote participant yet, waiting...")
for _ in range(100):
await asyncio.sleep(0.1)
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if remote_identity:
break
# Post-rotation: set caller keys with Python-derived HKDF (MAT-144).
# If track already subscribed, on_track_subscribed already set keys.
# This catches the case where track arrived before key rotation completed.
if self._caller_all_keys and remote_identity:
try:
for idx, base_k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp, remote_identity, base_k, idx)
logger.info("Post-rotation derived+set key[%d] for %s", idx, remote_identity)
except Exception as e:
logger.warning("Post-rotation set_key failed: %s", e)
elif not self._caller_all_keys:
logger.warning("No caller E2EE keys received — incoming audio will be silence")
if remote_identity:
logger.info("Linking to remote participant: %s", remote_identity)
# Activity video disabled (MAT-149) — was causing lag
# TODO: re-enable when performance is investigated
# try:
# self._activity_video = ActivityVideoPublisher()
# video_track = rtc.LocalVideoTrack.create_video_track("activity", self._activity_video.source)
# pub_opts = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA)
# await self.lk_room.local_participant.publish_track(video_track, pub_opts)
# self._activity_task = asyncio.create_task(self._activity_video.run())
# logger.info("Activity video track published")
# except Exception as e:
# logger.warning("Failed to publish activity video: %s", e)
# Load memories and user preferences for this caller
memory_section = ""
user_timezone = None
if self._memory and self._caller_user_id:
try:
mems = await self._memory.query(self._caller_user_id, "voice call", top_k=10)
if mems:
conversations = []
for m in mems:
fact = m['fact']
# Extract timezone preference from stored memories
if fact.startswith("[PREF:timezone]"):
user_timezone = fact.split("]", 1)[1].strip()
else:
conversations.append(fact)
if conversations:
memory_section = "\n\nFrühere Gespräche mit diesem Nutzer:\n" + \
"\n---\n".join(conversations)
logger.info("Loaded %d memories for %s (tz=%s)",
len(mems), self._caller_user_id, user_timezone)
except Exception as exc:
logger.warning("Memory query failed: %s", exc)
# Also query specifically for timezone preference if not found above
if not user_timezone and self._memory and self._caller_user_id:
try:
tz_mems = await self._memory.query(
self._caller_user_id, "timezone preference", top_k=3)
for m in (tz_mems or []):
if m['fact'].startswith("[PREF:timezone]"):
user_timezone = m['fact'].split("]", 1)[1].strip()
break
except Exception:
pass
# Voice pipeline — Jack Marlowe (native German male)
self._http_session = aiohttp.ClientSession()
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
self.session = AgentSession(
stt=elevenlabs.STT(
api_key=ELEVENLABS_KEY,
model_id="scribe_v2_realtime",
language_code=os.environ.get("STT_LANGUAGE", "de"),
http_session=self._http_session,
),
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
tts=elevenlabs.TTS(
voice_id=voice_id,
model="eleven_multilingual_v2",
language="de",
api_key=ELEVENLABS_KEY,
voice_settings=elevenlabs.VoiceSettings(
stability=0.5, similarity_boost=0.75, speed=1.15,
),
http_session=self._http_session,
),
vad=_get_vad(),
)
# Pipeline event logging (livekit-agents 1.4.2 event names)
_vad_state_log: dict = {"last": None, "speaking_count": 0, "away_since": None}
@self.session.on("user_state_changed")
def _on_user_state(ev):
state = ev.new_state
prev = _vad_state_log["last"]
_vad_state_log["last"] = state
if str(state) == "speaking":
_vad_state_log["speaking_count"] = _vad_state_log.get("speaking_count", 0) + 1
_vad_state_log["away_since"] = None
logger.info("VAD: user_state=%s (speaking_count=%d)", state, _vad_state_log["speaking_count"])
elif str(state) == "away" and prev and str(prev) != "away":
import time
_vad_state_log["away_since"] = time.monotonic()
logger.info("VAD: user_state=%s (was %s)", state, prev)
else:
logger.info("VAD: user_state=%s", state)
_last_user_speech: list[str] = []
@self.session.on("user_input_transcribed")
def _on_user_speech(ev):
text = ev.transcript or ""
if text and _is_stt_artifact(text):
logger.warning("NOISE_LEAK: artifact reached STT: %s", text)
return # Do NOT add artifacts to transcript — they inflate context
if text:
logger.info("USER_SPEECH: %s", text)
if ev.transcript:
_last_user_speech.append(ev.transcript)
self._transcript.append({"role": "user", "text": ev.transcript})
# Cap transcript to prevent unbounded memory growth
if len(self._transcript) > _MAX_CHAT_CTX_ITEMS * 2:
self._transcript[:] = self._transcript[-_MAX_CHAT_CTX_ITEMS:]
@self.session.on("conversation_item_added")
def _on_conversation_item(ev):
role = getattr(ev.item, "role", "?")
text = getattr(ev.item, "text_content", "") or ""
if role == "assistant" and text:
logger.info("AGENT_SPEECH: %s", text)
self._transcript.append({"role": "assistant", "text": text})
if self._memory and self._caller_user_id and _last_user_speech:
user_text = " ".join(_last_user_speech)
_last_user_speech.clear()
asyncio.ensure_future(
_store_voice_exchange(user_text, text,
self._caller_user_id, self.room_id))
# Brave Search tool — lets the agent answer questions about current events
@function_tool
async def search_web(query: str) -> str:
"""Search the web for current information using Brave Search.
Use this when asked about recent news, current events, prices,
weather, or any information that may have changed recently.
"""
logger.info("SEARCH: %s", query)
result = await _brave_search(query)
logger.info("SEARCH_RESULT: %s", result[:200])
return result
@function_tool
async def browse_url(url: str) -> str:
"""Open a web page and read its content. Use this when:
- The user shares a URL and wants you to read/summarize it
- You found a relevant URL from search_web and need more details
- The user asks to "open", "read", or "check" a link/website
Returns the page text content."""
logger.info("BROWSE: %s", url)
result = await _fetch_webpage(url)
# On DNS failure, try to find the correct URL via web search
if "Name or service not known" in result or "Failed to fetch" in result:
from urllib.parse import urlparse
domain = urlparse(url).netloc or url
logger.info("BROWSE_DNS_FAIL: %s — trying web search for correct domain", domain)
search_result = await _brave_search(f"{domain} website")
if search_result and "No results" not in search_result:
# Extract first URL from search results
import re as _re
url_match = _re.search(r'https?://[^\s\)]+', search_result)
if url_match:
corrected_url = url_match.group(0).rstrip('.,;')
logger.info("BROWSE_RETRY: trying %s (from search)", corrected_url)
result = await _fetch_webpage(corrected_url)
if "Failed to fetch" not in result:
logger.info("BROWSE_OK: %d chars from %s (corrected)", len(result), corrected_url)
return f"[Note: Original URL {url} was unreachable. Found correct site at {corrected_url}]\n\n{result}"
return f"Could not reach {url} and web search did not find an alternative. The domain name may be misspelled — ask the user to clarify."
logger.info("BROWSE_OK: %d chars from %s", len(result), url)
return result
# Tool: set user timezone — called by the LLM when user mentions their location
caller_uid = self._caller_user_id
@function_tool
async def set_user_timezone(iana_timezone: str) -> str:
"""Store the user's timezone. Call this when the user mentions their city
or timezone (e.g. 'I'm in Nicosia''Europe/Nicosia',
'I live in New York''America/New_York').
Use IANA timezone names like Europe/Berlin, America/New_York, Asia/Tokyo.
"""
if caller_uid:
await _store_user_pref(caller_uid, "timezone", iana_timezone)
return f"Timezone set to {iana_timezone}"
# Extract active Confluence page ID from document context (if any)
_active_conf_id = None
if self._document_context:
_conf_ids = re.findall(r'confluence_page_id:(\d+)', self._document_context)
if _conf_ids:
_active_conf_id = _conf_ids[0]
@function_tool
async def search_confluence(query: str) -> str:
"""Search Confluence for pages matching a query. Use when user asks
to find, search, or look up documents or pages in Confluence.
Returns a list of matching pages with titles and IDs."""
logger.info("CONFLUENCE_SEARCH: query=%s", query)
try:
results = await _confluence_search(query, limit=5)
if not results:
return f"No Confluence pages found for '{query}'."
lines = [f"Found {len(results)} pages:"]
for r in results:
lines.append(f"- {r['title']} (ID: {r['id']}, Space: {r['space']})")
return "\n".join(lines)
except Exception as exc:
logger.warning("CONFLUENCE_SEARCH_FAIL: %s", exc)
return f"Search failed: {exc}"
@function_tool
async def read_confluence_page(page_id: str = "") -> str:
"""Read a Confluence page. Use when user asks to read, review,
or check a document. Returns page title and content as text.
Leave page_id empty to use the active document from the room."""
pid = page_id or _active_conf_id
if not pid:
return "No Confluence page ID available. Ask the user to share a Confluence link first."
logger.info("CONFLUENCE_READ: page_id=%s", pid)
try:
title, text, _ver = await _confluence_read_page(pid)
result = f"Page: {title}\n\n{text}"
logger.info("CONFLUENCE_READ_OK: %s (%d chars)", title, len(text))
return result
except Exception as exc:
logger.warning("CONFLUENCE_READ_FAIL: %s", exc)
return f"Failed to read page: {exc}"
@function_tool
async def update_confluence_page(section_heading: str, new_content: str, page_id: str = "") -> str:
"""Update a section of a Confluence page. Use when user asks to
change, update, or rewrite part of a document.
- section_heading: heading text of the section to update
- new_content: new plain text for the section (paragraphs separated by newlines)
- page_id: leave empty to use the active document from the room
Human sees changes instantly in their browser via Live Docs."""
pid = page_id or _active_conf_id
if not pid:
return "No Confluence page ID available. Ask the user to share a Confluence link first."
logger.info("CONFLUENCE_UPDATE: page=%s section='%s'", pid, section_heading)
try:
# Wrap each paragraph in <p> tags for proper formatting
paragraphs = [p.strip() for p in new_content.split("\n") if p.strip()]
new_html = "".join(f"<p>{p}</p>" for p in paragraphs) if paragraphs else f"<p>{new_content}</p>"
result = await _confluence_update_section(pid, section_heading, new_html)
logger.info("CONFLUENCE_UPDATE_OK: %s", result)
return result
except Exception as exc:
logger.warning("CONFLUENCE_UPDATE_FAIL: %s", exc)
return f"Failed to update page: {exc}"
@function_tool
async def recent_confluence_pages() -> str:
"""List recently modified Confluence pages. Use this FIRST when the user
mentions Confluence, documents, or wiki pages — before searching.
Shows the last 5 recently edited pages so the user can pick one directly."""
logger.info("CONFLUENCE_RECENT")
try:
results = await _confluence_recent_pages(limit=5)
if not results:
return "No recent Confluence pages found."
lines = ["Recently modified pages:"]
for i, r in enumerate(results, 1):
lines.append(f"{i}. {r['title']} (ID: {r['id']}, Space: {r['space']})")
return "\n".join(lines)
except Exception as exc:
logger.warning("CONFLUENCE_RECENT_FAIL: %s", exc)
return f"Failed to fetch recent pages: {exc}"
@function_tool
async def create_confluence_page(title: str, content: str, space_key: str = "AG") -> str:
"""Create a new Confluence page. Use when user asks to create a new document,
write a new page, or make a new wiki entry.
- title: page title
- content: page body text (paragraphs separated by newlines)
- space_key: Confluence space key (default: AG for Agiliton)"""
logger.info("CONFLUENCE_CREATE: title='%s' space=%s", title, space_key)
try:
paragraphs = [p.strip() for p in content.split("\n") if p.strip()]
body_html = "".join(f"<p>{p}</p>" for p in paragraphs) if paragraphs else f"<p>{content}</p>"
result = await _confluence_create_page(space_key, title, body_html)
logger.info("CONFLUENCE_CREATE_OK: id=%s title='%s'", result["id"], result["title"])
return f"Page created: {result['title']} (ID: {result['id']})\nURL: {result['url']}"
except Exception as exc:
logger.warning("CONFLUENCE_CREATE_FAIL: %s", exc)
return f"Failed to create page: {exc}"
# Deep thinking tool — escalates to Opus for complex questions
_transcript_ref = self._transcript
_doc_context_ref = self._document_context
@function_tool
async def think_deeper(question: str) -> str:
"""Denke intensiver ueber eine komplexe Frage nach mit einem staerkeren Modell.
Nutze dieses Tool wenn:
- Der Nutzer sagt "denk genauer nach", "think harder", "nimm opus", "use opus",
"ueberleg nochmal", "analysier das genauer", "bist du dir sicher", "are you sure",
"stimmt das wirklich", "check nochmal", "pruef das nochmal"
- Du dir bei einer komplexen Analyse, Code-Review oder Dokumentinterpretation unsicher bist
- Der Nutzer deine letzte Antwort anzweifelt oder eine Gegenprüfung wuenscht
- Eine Frage mehrere Schritte logisches Denken erfordert
Beschreibe die Frage so praezise wie moeglich — der Kontext (Transkript + Dokument)
wird automatisch mitgeliefert."""
# Build context: recent transcript + document
context_parts = []
if _doc_context_ref:
context_parts.append(f"Dokument-Kontext:\n{_doc_context_ref[:12000]}")
recent = _transcript_ref[-10:] if _transcript_ref else []
if recent:
lines = []
for e in recent:
role = "Nutzer" if e["role"] == "user" else "Assistent"
lines.append(f"{role}: {e['text']}")
context_parts.append(f"Gespraechsverlauf:\n" + "\n".join(lines))
context_parts.append(f"Frage: {question}")
full_prompt = "\n\n---\n\n".join(context_parts)
logger.info("THINK_DEEPER: %s (context=%d chars)", question[:100], len(full_prompt))
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{LITELLM_URL}/chat/completions",
headers={"Authorization": f"Bearer {LITELLM_KEY}"},
json={
"model": "claude-opus",
"messages": [
{"role": "system", "content": (
"Du bist ein Experte fuer tiefgehende Analyse. "
"Beantworte die Frage praezise und ausfuehrlich basierend auf dem Kontext. "
"Antworte in der Sprache der Frage."
)},
{"role": "user", "content": full_prompt},
],
"max_tokens": 1500,
},
)
resp.raise_for_status()
data = resp.json()
answer = data["choices"][0]["message"]["content"]
logger.info("THINK_DEEPER_OK: %s", answer[:200])
return answer
except Exception as exc:
logger.warning("THINK_DEEPER_FAIL: %s", exc)
return f"Tiefere Analyse fehlgeschlagen: {exc}"
# Vision tool — capture video frame and analyze with vision model
_video_track_ref = self # reference to VoiceSession for video track access
_lk_room_ref = self.lk_room
_session_ref = self.session # for say() in tools
@function_tool
async def look_at_screen(question: str) -> str:
"""Schau dir an was der Nutzer auf dem Bildschirm oder per Kamera zeigt.
Nutze dieses Tool wenn:
- Der Nutzer sagt "schau mal", "siehst du das", "was siehst du", "look at this",
"can you see", "zeig dir was", "schau auf meinen Bildschirm", "kannst du das sehen"
- Der Nutzer seinen Bildschirm teilt und eine Frage dazu stellt
- Der Nutzer seine Kamera aktiviert hat und etwas zeigen moechte
Beschreibe was du sehen moechtest oder stelle eine Frage zum Bild."""
video_track = _video_track_ref._video_track
if not video_track:
return ("Kein Video verfuegbar. Der Nutzer muss seine Kamera oder "
"Bildschirmfreigabe aktivieren bevor ich etwas sehen kann.")
# Instant filler so user knows bot is looking
try:
await _session_ref.say("Einen Moment, ich schaue mir das an.",
allow_interruptions=True, add_to_chat_ctx=False)
except Exception:
pass
try:
# Capture single frame from video track
stream = rtc.VideoStream(video_track)
frame = None
async for f in stream:
frame = f
break
try:
await stream.aclose()
except Exception:
pass
if frame is None:
return "Konnte kein Bild aufnehmen — kein Frame verfuegbar."
# Handle both VideoFrameEvent (.frame) and direct VideoFrame
vf = getattr(frame, 'frame', frame)
# Convert to RGBA and encode as JPEG
rgba = vf.convert(rtc.VideoBufferType.RGBA)
from PIL import Image
import io
img = Image.frombytes("RGBA", (rgba.width, rgba.height), bytes(rgba.data))
buf = io.BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=85)
img_b64 = base64.b64encode(buf.getvalue()).decode()
if rgba.width <= 16 or rgba.height <= 16:
logger.warning("LOOK_AT_SCREEN: frame %dx%d — E2EE decryption likely failed (garbage frame)",
rgba.width, rgba.height)
return ("E2EE Video-Entschluesselung fehlgeschlagen — das Bild ist nur "
f"{rgba.width}x{rgba.height} Pixel. Bitte Bildschirmfreigabe neu starten.")
logger.info("LOOK_AT_SCREEN: captured %dx%d frame (%d KB JPEG)",
rgba.width, rgba.height, len(buf.getvalue()) // 1024)
# Build context: transcript + document + question
context_parts = []
if _doc_context_ref:
context_parts.append(f"Dokument-Kontext:\n{_doc_context_ref[:8000]}")
recent = _transcript_ref[-10:] if _transcript_ref else []
if recent:
lines = [f"{'Nutzer' if e['role'] == 'user' else 'Assistent'}: {e['text']}"
for e in recent]
context_parts.append("Gespraechsverlauf:\n" + "\n".join(lines))
context_parts.append(f"Frage zum Bild: {question}")
text_prompt = "\n\n---\n\n".join(context_parts)
# Send to vision model via LiteLLM (OpenAI-compatible multimodal format)
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{LITELLM_URL}/chat/completions",
headers={"Authorization": f"Bearer {LITELLM_KEY}"},
json={
"model": "claude-sonnet",
"messages": [
{"role": "system", "content": (
"Du analysierst Bilder von Bildschirm oder Kamera eines Nutzers. "
"Antworte praezise und hilfreich in der Sprache der Frage. "
"Beschreibe was du siehst und beantworte die Frage des Nutzers."
)},
{"role": "user", "content": [
{"type": "image_url", "image_url": {
"url": f"data:image/jpeg;base64,{img_b64}"}},
{"type": "text", "text": text_prompt},
]},
],
"max_tokens": 1500,
},
)
resp.raise_for_status()
data = resp.json()
answer = data["choices"][0]["message"]["content"]
logger.info("LOOK_AT_SCREEN_OK: %s", answer[:200])
return answer
except asyncio.TimeoutError:
return "Konnte kein Bild aufnehmen — Timeout. Ist die Kamera/Bildschirmfreigabe aktiv?"
except Exception as exc:
logger.warning("LOOK_AT_SCREEN_FAIL: %s", exc)
return f"Bildanalyse fehlgeschlagen: {exc}"
instructions = _build_voice_prompt(model=self.model, timezone=user_timezone) + memory_section
if self._document_context:
instructions += f"\n\nDokument-Kontext (im Raum hochgeladen):\n{self._document_context}"
if _active_conf_id:
instructions += f"\n\nAktive Confluence-Seite: {_active_conf_id}. Du brauchst den Nutzer NICHT nach der page_id zu fragen — nutze automatisch diese ID fuer read_confluence_page und update_confluence_page."
agent = _NoiseFilterAgent(
instructions=instructions,
tools=[search_web, browse_url, set_user_timezone, recent_confluence_pages, search_confluence, read_confluence_page, update_confluence_page, create_confluence_page, think_deeper, look_at_screen],
)
io_opts = room_io.RoomOptions(
participant_identity=remote_identity,
close_on_disconnect=False,
) if remote_identity else room_io.RoomOptions(close_on_disconnect=False)
await self.session.start(
agent=agent,
room=self.lk_room,
room_options=io_opts,
)
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
# Wire agent state to activity video animation
if self._activity_video:
@self.session.on("agent_state_changed")
def _on_state_changed(ev):
self._activity_video.set_state(ev.new_state)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Sage nur: Hallo, wie kann ich helfen?"),
timeout=30.0)
logger.info("Greeting sent")
except asyncio.TimeoutError:
logger.error("Greeting timed out")
# VAD watchdog: log diagnostic, attempt E2EE key recovery, and recover from LLM failures
import time as _time
_last_agent_speech_time = _time.monotonic()
_llm_recovery_attempted = False
@self.session.on("conversation_item_added")
def _track_agent_speech(ev):
nonlocal _last_agent_speech_time, _llm_recovery_attempted
role = getattr(ev.item, "role", "?")
if role == "assistant":
_last_agent_speech_time = _time.monotonic()
_llm_recovery_attempted = False # reset on successful speech
while True:
await asyncio.sleep(10)
# LLM timeout recovery: if user has been speaking but agent
# hasn't responded in >60s, the LLM pipeline is likely stuck
sc = _vad_state_log.get("speaking_count", 0)
agent_silent_secs = _time.monotonic() - _last_agent_speech_time
if sc > 0 and agent_silent_secs > 60 and not _llm_recovery_attempted:
_llm_recovery_attempted = True
logger.warning(
"LLM_RECOVERY: agent silent for %.0fs after %d user turns "
"— sending recovery reply", agent_silent_secs, sc)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Entschuldigung, ich hatte kurz ein technisches Problem. "
"Kannst du deine letzte Frage bitte wiederholen?"),
timeout=30.0)
logger.info("LLM_RECOVERY: recovery reply sent successfully")
except Exception as exc:
logger.error("LLM_RECOVERY: recovery reply failed: %s", exc)
away_since = _vad_state_log.get("away_since")
if away_since and (_time.monotonic() - away_since) > 30:
e2ee_ok = any(
str(getattr(p, '_e2ee_state', '')) == 'OK'
for p in self.lk_room.remote_participants.values()
) if self.lk_room else False
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
logger.warning(
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
"remote_participants=%d, e2ee_ok=%s) — attempting E2EE key recovery",
sc, n_remote, e2ee_ok,
)
# Recovery: re-apply all stored E2EE keys for all remote participants
if self.lk_room and self._caller_all_keys:
try:
kp_w = self.lk_room.e2ee_manager.key_provider
for p in self.lk_room.remote_participants.values():
for idx, base_k in sorted(self._caller_all_keys.items()):
_derive_and_set_key(kp_w, p.identity, base_k, idx)
logger.info("VAD_WATCHDOG: recovery derived+set key[%d] for %s",
idx, p.identity)
except Exception as exc:
logger.warning("VAD_WATCHDOG: recovery set_key failed: %s", exc)
_vad_state_log["away_since"] = None # only warn once per stuck period
except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id)
except Exception:
logger.exception("Session failed for %s", self.room_id)