Files
matrix-ai-agent/voice.py
Christian Gick c532f4678d fix(e2ee): consolidate key timing + noise filtering (MAT-40, MAT-41)
- set_key() only called after frame cryptor exists (on_track_subscribed / late arrival)
- Remove 10s blocking key rotation wait; keys applied asynchronously
- Add DEC_FAILED (state 3) to e2ee_state recovery triggers
- VAD watchdog re-applies all E2EE keys on >30s stuck as recovery
- Expand STT artifact patterns (English variants, double-asterisk)
- Add NOISE_LEAK diagnostic logging at STT level

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 08:33:40 +02:00

714 lines
34 KiB
Python

"""Voice session: LiveKit + STT/LLM/TTS pipeline.
E2EE via HKDF key derivation (Element Call compatible).
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
import asyncio
import base64
import datetime
import hashlib
import logging
import os
import zoneinfo
import json
import re
import aiohttp
import httpx
from livekit import rtc, api as lkapi
from livekit.agents import Agent, AgentSession, StopResponse, function_tool, room_io, llm
from livekit.plugins import openai as lk_openai, elevenlabs, silero
from openai import AsyncOpenAI
logger = logging.getLogger("matrix-ai-voice")
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service:8090")
DEFAULT_VOICE_ID = "ML23UVoFL5mI6APbRAeR" # Robert Ranger - Cool Storyteller, native German
_VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
Aktuelle Zeit: {datetime}
STRIKTE Regeln:
- Antworte in der Sprache des Nutzers (Deutsch oder Englisch)
- Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze
- Sei direkt und praezise, keine Fuellwoerter
- Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie
- Beantworte nur was gefragt wird
- Wenn niemand etwas fragt, sage nur kurz Hallo
- Schreibe Zahlen und Jahreszahlen IMMER als Woerter aus (z.B. "zweitausendundzwanzig" statt "2026", "zweiundzwanzigsten Februar" statt "22. Februar")
- IGNORIERE alle Texte in Sternchen wie *Störgeräusche*, *Schlechte Qualität*, *Fernsehgeräusche*, *Schrei* usw. — das sind KEINE echten Nutzereingaben sondern technische Annotationen. Antworte NIEMALS darauf und tue so als haette niemand etwas gesagt."""
def _build_voice_prompt() -> str:
tz_name = os.environ.get("VOICE_TIMEZONE", "Europe/Berlin")
try:
tz = zoneinfo.ZoneInfo(tz_name)
except Exception:
tz = datetime.timezone.utc
now = datetime.datetime.now(tz)
return _VOICE_PROMPT_TEMPLATE.format(
datetime=now.strftime("%A, %d. %B %Y %H:%M %Z")
)
# ElevenLabs scribe_v2_realtime produces two kinds of artifacts:
# 1. Noise annotations: *Störgeräusche*, *Schlechte Qualität*, etc.
# 2. Subtitle/metadata leaks: "Untertitel: ARD Text im Auftrag von Funk (2017)"
# Filter both via on_user_turn_completed (downstream of VAD+STT, no pipeline impact).
_NOISE_ANNOTATION_RE = re.compile(r'^\s*\*[^*]+\*\s*$')
_STT_ARTIFACT_PATTERNS = [
re.compile(r'(?i)^untertitel\b'), # subtitle metadata
re.compile(r'(?i)^copyright\b'), # copyright notices
re.compile(r'(?i)^musik\s*$'), # bare "Musik" annotation
re.compile(r'(?i)^\(.*\)\s*$'), # parenthetical annotations like (Applaus)
re.compile(r'(?i)^\[.*\]\s*$'), # bracketed annotations like [Musik]
re.compile(r'(?i)^(background noise|static|silence|applause|laughter|noise)\s*$'), # English variants
re.compile(r'^\s*\*\*[^*]+\*\*\s*$'), # double-asterisk: **Noise**
]
def _is_stt_artifact(text: str) -> bool:
"""Check if text is an STT artifact (noise annotation or metadata leak)."""
if _NOISE_ANNOTATION_RE.match(text):
return True
return any(p.match(text) for p in _STT_ARTIFACT_PATTERNS)
class _NoiseFilterAgent(Agent):
"""Agent that suppresses ElevenLabs STT artifacts before LLM sees them.
Uses on_user_turn_completed() which runs after VAD+STT, so no backpressure
risk to the audio pipeline. Raises StopResponse to silently discard noise.
"""
async def on_user_turn_completed(
self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
) -> None:
text = (new_message.text_content or "").strip()
if text and _is_stt_artifact(text):
logger.info("STT artifact suppressed: %s", text)
# Remove the artifact from context so it doesn't accumulate
if turn_ctx.items and turn_ctx.items[-1] is new_message:
turn_ctx.items.pop()
raise StopResponse()
_vad = None
def _get_vad():
global _vad
if _vad is None:
_vad = silero.VAD.load(
activation_threshold=0.50,
min_speech_duration=0.2,
min_silence_duration=0.55,
)
return _vad
def _make_lk_identity(user_id, device_id):
return f"{user_id}:{device_id}"
def _compute_lk_room_name(room_id):
raw = f"{room_id}|m.call#ROOM"
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
def _generate_lk_jwt(room_id, user_id, device_id):
identity = _make_lk_identity(user_id, device_id)
lk_room = _compute_lk_room_name(room_id)
token = (
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
.with_identity(identity)
.with_name(user_id)
.with_grants(lkapi.VideoGrants(
room_join=True, room=lk_room,
can_publish=True, can_subscribe=True))
.with_ttl(datetime.timedelta(hours=24))
)
logger.info("JWT: identity=%s room=%s", identity, lk_room)
return token.to_jwt()
KDF_HKDF = 1 # Rust FFI applies HKDF internally (proto enum value 1)
# NOTE: value 0 in the proto is PBKDF2, NOT raw/none mode — see e2ee_patch.py
def _hkdf_derive(ikm: bytes) -> bytes:
"""Pre-derive AES key via HKDF-SHA256 matching livekit-client-sdk-js deriveEncryptionKey().
JS params: hash=SHA-256, salt=encode("LKFrameEncryptionKey"), info=ArrayBuffer(128), length=128bit
We set this pre-derived key via set_shared_key() which bypasses Rust FFI KDF entirely.
"""
import hmac
salt = b"LKFrameEncryptionKey"
info = b"\x00" * 128
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
t1 = hmac.new(prk, info + b"\x01", hashlib.sha256).digest()
return t1[:16]
def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]:
"""Pre-compute ratcheted keys 0..count-1 matching livekit-client-sdk-js ratchet().
EC JS ratchet: new_raw = HMAC(key=current_raw, data="LKFrameEncryptionKey")[:16]
Returns {index: raw_key} for all indices 0..count-1.
Set these via set_key(identity, raw, index) with KDF_HKDF=1 so Rust applies HKDF.
"""
import hmac as _hmac
keys = {}
raw = base_raw
for i in range(count):
keys[i] = raw
raw = _hmac.new(raw, b"LKFrameEncryptionKey", hashlib.sha256).digest()[:16]
return keys
async def _brave_search(query: str, count: int = 5) -> str:
"""Call Brave Search API and return formatted results."""
if not BRAVE_API_KEY:
return "Search unavailable (no API key configured)."
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://api.search.brave.com/res/v1/web/search",
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
params={"q": query, "count": count, "text_decorations": False},
)
resp.raise_for_status()
data = resp.json()
results = data.get("web", {}).get("results", [])
if not results:
return "No results found."
lines = []
for r in results[:count]:
lines.append(f"- {r.get('title', '')}: {r.get('description', '')} ({r.get('url', '')})")
return "\n".join(lines)
except Exception as exc:
logger.warning("Brave search error: %s", exc)
return f"Search failed: {exc}"
async def _extract_voice_memories(user_text: str, agent_text: str,
user_id: str, room_id: str) -> None:
"""Extract memorable facts from a voice exchange and store them."""
if not LITELLM_URL or not MEMORY_SERVICE_URL:
return
try:
# Fetch existing facts to avoid duplicates
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{MEMORY_SERVICE_URL}/memories/query",
json={"user_id": user_id, "query": "all facts", "top_k": 20},
)
existing = [m["fact"] for m in resp.json().get("results", [])] if resp.is_success else []
existing_text = "\n".join(f"- {f}" for f in existing) if existing else "(none)"
llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY)
resp = await llm.chat.completions.create(
model="claude-haiku",
messages=[
{"role": "system", "content": (
"Extract memorable facts about the user from this voice conversation snippet. "
"Return a JSON array of concise strings. Include: name, preferences, location, "
"occupation, interests, family, projects. Skip duplicate or temporary info. "
"Return [] if nothing new."
)},
{"role": "user", "content": (
f"Existing memories:\n{existing_text}\n\n"
f"User said: {user_text}\nAssistant replied: {agent_text}\n\n"
"New facts (JSON array):"
)},
],
max_tokens=200,
)
raw = resp.choices[0].message.content.strip()
if raw.startswith("```"):
raw = re.sub(r"^```\w*\n?", "", raw)
raw = re.sub(r"\n?```$", "", raw)
match = re.search(r"\[.*\]", raw, re.DOTALL)
if match:
raw = match.group(0)
new_facts = json.loads(raw)
if not isinstance(new_facts, list):
return
async with httpx.AsyncClient(timeout=10.0) as client:
for fact in new_facts:
if isinstance(fact, str) and fact.strip():
await client.post(
f"{MEMORY_SERVICE_URL}/memories/store",
json={"user_id": user_id, "fact": fact.strip(), "source_room": room_id},
)
logger.info("Memory stored for %s: %s", user_id, fact[:80])
except Exception as exc:
logger.warning("Voice memory extraction failed: %s", exc)
def _build_e2ee_options() -> rtc.E2EEOptions:
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
Pass raw base keys from Matrix key exchange events directly to set_key().
The Rust FFI derives the AES frame key via HKDF(base_key, ratchetSalt, ...) internally.
Element Call uses: ratchetWindowSize=10, keyringSize=256, ratchetSalt="LKFrameEncryptionKey"
NOTE: proto value 0 = PBKDF2 (not raw/none) — must use KDF_HKDF=1.
"""
key_opts = rtc.KeyProviderOptions(
shared_key=b"", # empty = per-participant mode
ratchet_window_size=10,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=10,
key_ring_size=256,
key_derivation_function=KDF_HKDF, # Rust FFI applies HKDF; we pass raw base keys
)
return rtc.E2EEOptions(
encryption_type=rtc.EncryptionType.GCM,
key_provider_options=key_opts,
)
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
publish_key_cb=None, bot_key: bytes | None = None,
memory=None, caller_user_id: str | None = None):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
self.lk_url = lk_url
self.model = model
self.lk_room = None
self.session = None
self._task = None
self._http_session = None
self._caller_key: bytes | None = None
self._caller_identity: str | None = None
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
self._bot_key: bytes = bot_key or os.urandom(16)
self._publish_key_cb = publish_key_cb
self._memory = memory # MemoryClient instance from bot.py
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant.
Store-only: keys are applied in on_track_subscribed() / on_e2ee_state_changed()
where the frame cryptor is guaranteed to exist. If the track is already
subscribed (late key arrival / rotation), set the key immediately.
"""
if not key:
return
if not self._caller_key:
self._caller_key = key
self._caller_identity = f"{sender}:{device_id}"
self._caller_all_keys[index] = key
logger.info("E2EE key received from %s:%s (index=%d, %d bytes, raw=%s)",
sender, device_id, index, len(key), key.hex())
# Late key arrival: if track already subscribed, frame cryptor exists — set key now.
if self.lk_room and self._caller_identity:
caller_lk_id = self._caller_identity
for p in self.lk_room.remote_participants.values():
if p.identity == caller_lk_id:
has_subscribed = any(
pub.subscribed for pub in p.track_publications.values()
)
if has_subscribed:
try:
kp = self.lk_room.e2ee_manager.key_provider
ok = kp.set_key(p.identity, key, index)
logger.info("Late key set_key[%d] for %s (ok=%s)",
index, p.identity, ok)
except Exception as e:
logger.warning("Late key set_key failed: %s", e)
break
async def _fetch_encryption_key_http(self) -> bytes | None:
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
Element Call distributes encryption keys as timeline events, not state.
"""
import httpx
homeserver = str(self.nio_client.homeserver)
token = self.nio_client.access_token
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
try:
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
url,
headers={"Authorization": f"Bearer {token}"},
params={"dir": "b", "limit": "50"},
)
resp.raise_for_status()
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
sender = evt.get("sender", "")
if sender == user_id:
continue # skip our own key
content = evt.get("content", {})
device = content.get("device_id", "")
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
sender, device)
all_keys = {}
import base64 as b64
for k in content.get("keys", []):
key_b64 = k.get("key", "")
key_index = k.get("index", 0)
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key_bytes = b64.urlsafe_b64decode(key_b64)
all_keys[key_index] = key_bytes
if all_keys:
if device:
self._caller_identity = f"{sender}:{device}"
self._caller_all_keys.update(all_keys)
max_idx = max(all_keys.keys())
logger.info("Loaded caller keys at indices %s (using %d)",
sorted(all_keys.keys()), max_idx)
return all_keys[max_idx]
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
logger.warning("HTTP encryption key fetch failed: %s", e)
return None
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
for obj in [self.session, self.lk_room, self._http_session]:
if obj:
try:
if hasattr(obj, "aclose"):
await obj.aclose()
elif hasattr(obj, "disconnect"):
await obj.disconnect()
elif hasattr(obj, "close"):
await obj.close()
except Exception:
pass
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run(self):
try:
user_id = self.nio_client.user_id
bot_identity = _make_lk_identity(user_id, self.device_id)
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Publish bot's own key immediately so Element Call can decrypt us
if self._publish_key_cb:
self._publish_key_cb(self._bot_key)
logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key))
# Check timeline for caller's encryption key
caller_key = await self._fetch_encryption_key_http()
if caller_key:
self._caller_key = caller_key
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
if not self._caller_key:
# Wait up to 15s for key via sync handler
logger.info("No key in timeline yet, waiting for sync...")
for _ in range(150):
if self._caller_key:
break
await asyncio.sleep(0.1)
# E2EE: configurable via E2EE_ENABLED env var (default true).
# When enabled, parameters MUST match Element Call JS SDK.
e2ee_enabled = os.environ.get("E2EE_ENABLED", "true").lower() in ("true", "1", "yes")
if e2ee_enabled:
e2ee_opts = _build_e2ee_options()
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
logger.info("E2EE enabled (HKDF mode)")
else:
e2ee_opts = None
room_opts = rtc.RoomOptions()
logger.warning("E2EE DISABLED — audio is unencrypted")
self.lk_room = rtc.Room()
@self.lk_room.on("participant_connected")
def on_p(p):
logger.info("Participant connected: %s", p.identity)
# Note: do NOT set keys here — frame cryptor not initialized yet at participant_connected.
# Keys are set in on_track_subscribed where the frame cryptor definitely exists.
@self.lk_room.on("track_published")
def on_tp(pub, p):
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_muted")
def on_mute(pub, p):
logger.info("Track MUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_unmuted")
def on_unmute(pub, p):
logger.info("Track UNMUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_subscribed")
def on_ts(t, pub, p):
logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted)
# NOTE: Do NOT create rtc.AudioStream here — it competes with AgentSession's
# internal audio pipeline for event loop time, causing intermittent VAD failures
# (user_state stuck on "away"). See MAT-40. Use e2ee_state_changed for flow confirmation.
# *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor
# for this participant already exists. The frame cryptor is created at track
# subscription time. Calling set_key() BEFORE track subscription (at connect)
# skips HKDF derivation → raw key stored → DEC_FAILED.
# Solution: set caller key HERE, after frame cryptor is initialized.
if int(t.kind) == 1 and e2ee_opts is not None: # audio track only
caller_id = p.identity
logger.info("E2EE_DIAG: track_subscribed for %s, have %d caller keys",
caller_id, len(self._caller_all_keys))
try:
kp_local = self.lk_room.e2ee_manager.key_provider
if self._caller_all_keys:
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_local.set_key(caller_id, base_k, idx)
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes, raw=%s)",
idx, caller_id, ok, len(base_k), base_k.hex())
else:
logger.warning("on_ts: no caller keys yet — scheduling 0.5s retry")
async def _brief_key_retry(pid=caller_id):
await asyncio.sleep(0.5)
if self._caller_all_keys:
try:
kp_r = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_r.set_key(pid, base_k, idx)
logger.info("on_ts_retry: set_key[%d] for %s (ok=%s)", idx, pid, ok)
except Exception as exc:
logger.warning("on_ts_retry: set_key failed: %s", exc)
else:
logger.warning("on_ts_retry: still no caller keys for %s", pid)
asyncio.ensure_future(_brief_key_retry())
except Exception as exc:
logger.warning("on_ts: set_key failed: %s", exc)
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
@self.lk_room.on("e2ee_state_changed")
def on_e2ee_state(participant, state):
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
p_id = participant.identity if participant else "local"
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
# When remote participant frame cryptor is NEW or MISSING_KEY → set their key
if participant and p_id != bot_identity and int(state) in (0, 3, 4):
if self._caller_all_keys:
try:
kp_e = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_e.set_key(p_id, base_k, idx)
logger.info("e2ee_state set_key[%d] for %s (ok=%s)", idx, p_id, ok)
except Exception as exc:
logger.warning("e2ee_state set_key failed: %s", exc)
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=HKDF), remote=%d",
len(self.lk_room.remote_participants))
# Set bot's own key immediately after connect — local frame cryptor exists at connect time.
# CALLER keys are set in on_track_subscribed (NOT here) because the caller's frame cryptor
# is only created when their track arrives. Calling set_key() before that skips HKDF.
kp = self.lk_room.e2ee_manager.key_provider
ok = kp.set_key(bot_identity, self._bot_key, 0)
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
# Element Call may rotate keys when bot joins — handled asynchronously by
# on_encryption_key() (stores keys) + on_track_subscribed() / on_e2ee_state_changed()
# (applies keys when frame cryptor exists). No blocking wait needed.
logger.info("E2EE key state: %d keys stored (max_idx=%d)",
len(self._caller_all_keys),
max(self._caller_all_keys.keys()) if self._caller_all_keys else -1)
# Find the remote participant, wait up to 10s if not yet connected
remote_identity = None
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if not remote_identity:
logger.info("No remote participant yet, waiting...")
for _ in range(100):
await asyncio.sleep(0.1)
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if remote_identity:
break
# Set shared_key with pre-derived AES key for caller decryption.
# NOT using set_key() for caller — Rust HKDF may produce different result than EC's JS HKDF.
# set_shared_key() stores key raw (no KDF applied) — we pre-derive in Python.
# After key rotation wait: if track already subscribed, set rotated key.
# (Usually on_track_subscribed handles this, but if track arrived before rotation,
# the rotated key needs to be set here for the already-subscribed participant.)
if self._caller_all_keys and remote_identity:
try:
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp.set_key(remote_identity, base_k, idx)
logger.info("Post-rotation set_key[%d] for %s (ok=%s)",
idx, remote_identity, ok)
except Exception as e:
logger.warning("Post-rotation set_key failed: %s", e)
elif not self._caller_all_keys:
logger.warning("No caller E2EE keys received — incoming audio will be silence")
if remote_identity:
logger.info("Linking to remote participant: %s", remote_identity)
# Load memories for this caller
memory_section = ""
if self._memory and self._caller_user_id:
try:
mems = await self._memory.query(self._caller_user_id, "voice call", top_k=10)
if mems:
memory_section = "\n\nKontext aus früheren Gesprächen mit diesem Nutzer:\n" + \
"\n".join(f"- {m['fact']}" for m in mems)
logger.info("Loaded %d memories for %s", len(mems), self._caller_user_id)
except Exception as exc:
logger.warning("Memory query failed: %s", exc)
# Voice pipeline — Jack Marlowe (native German male)
self._http_session = aiohttp.ClientSession()
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
self.session = AgentSession(
stt=elevenlabs.STT(
api_key=ELEVENLABS_KEY,
model_id="scribe_v2_realtime",
language_code=os.environ.get("STT_LANGUAGE", "de"),
http_session=self._http_session,
),
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
tts=elevenlabs.TTS(
voice_id=voice_id,
model="eleven_multilingual_v2",
language="de",
api_key=ELEVENLABS_KEY,
http_session=self._http_session,
),
vad=_get_vad(),
)
# Pipeline event logging (livekit-agents 1.4.2 event names)
_vad_state_log: dict = {"last": None, "speaking_count": 0, "away_since": None}
@self.session.on("user_state_changed")
def _on_user_state(ev):
state = ev.new_state
prev = _vad_state_log["last"]
_vad_state_log["last"] = state
if str(state) == "speaking":
_vad_state_log["speaking_count"] = _vad_state_log.get("speaking_count", 0) + 1
_vad_state_log["away_since"] = None
logger.info("VAD: user_state=%s (speaking_count=%d)", state, _vad_state_log["speaking_count"])
elif str(state) == "away" and prev and str(prev) != "away":
import time
_vad_state_log["away_since"] = time.monotonic()
logger.info("VAD: user_state=%s (was %s)", state, prev)
else:
logger.info("VAD: user_state=%s", state)
_last_user_speech: list[str] = []
@self.session.on("user_input_transcribed")
def _on_user_speech(ev):
text = ev.transcript or ""
if text and _is_stt_artifact(text):
logger.warning("NOISE_LEAK: artifact reached STT: %s", text)
else:
logger.info("USER_SPEECH: %s", text)
if ev.transcript:
_last_user_speech.append(ev.transcript)
@self.session.on("conversation_item_added")
def _on_conversation_item(ev):
role = getattr(ev.item, "role", "?")
text = getattr(ev.item, "text_content", "") or ""
if role == "assistant" and text:
logger.info("AGENT_SPEECH: %s", text)
if self._memory and self._caller_user_id and _last_user_speech:
user_text = " ".join(_last_user_speech)
_last_user_speech.clear()
asyncio.ensure_future(
_extract_voice_memories(user_text, text,
self._caller_user_id, self.room_id))
# Brave Search tool — lets the agent answer questions about current events
@function_tool
async def search_web(query: str) -> str:
"""Search the web for current information using Brave Search.
Use this when asked about recent news, current events, prices,
weather, or any information that may have changed recently.
"""
logger.info("SEARCH: %s", query)
result = await _brave_search(query)
logger.info("SEARCH_RESULT: %s", result[:200])
return result
agent = _NoiseFilterAgent(
instructions=_build_voice_prompt() + memory_section,
tools=[search_web],
)
io_opts = room_io.RoomOptions(
participant_identity=remote_identity,
close_on_disconnect=False,
) if remote_identity else room_io.RoomOptions(close_on_disconnect=False)
await self.session.start(
agent=agent,
room=self.lk_room,
room_options=io_opts,
)
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Sage nur: Hallo, wie kann ich helfen?"),
timeout=30.0)
logger.info("Greeting sent")
except asyncio.TimeoutError:
logger.error("Greeting timed out")
# VAD watchdog: log diagnostic and attempt E2EE key recovery if stuck
import time as _time
while True:
await asyncio.sleep(10)
away_since = _vad_state_log.get("away_since")
if away_since and (_time.monotonic() - away_since) > 30:
sc = _vad_state_log.get("speaking_count", 0)
e2ee_ok = any(
str(getattr(p, '_e2ee_state', '')) == 'OK'
for p in self.lk_room.remote_participants.values()
) if self.lk_room else False
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
logger.warning(
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
"remote_participants=%d, e2ee_ok=%s) — attempting E2EE key recovery",
sc, n_remote, e2ee_ok,
)
# Recovery: re-apply all stored E2EE keys for all remote participants
if self.lk_room and self._caller_all_keys:
try:
kp_w = self.lk_room.e2ee_manager.key_provider
for p in self.lk_room.remote_participants.values():
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_w.set_key(p.identity, base_k, idx)
logger.info("VAD_WATCHDOG: recovery set_key[%d] for %s (ok=%s)",
idx, p.identity, ok)
except Exception as exc:
logger.warning("VAD_WATCHDOG: recovery set_key failed: %s", exc)
_vad_state_log["away_since"] = None # only warn once per stuck period
except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id)
except Exception:
logger.exception("Session failed for %s", self.room_id)