flash_v2_5 had audible compression artifacts. multilingual_v2 has higher fidelity while speed=1.15 via VoiceSettings still gives snappier delivery. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
717 lines
34 KiB
Python
717 lines
34 KiB
Python
"""Voice session: LiveKit + STT/LLM/TTS pipeline.
|
|
E2EE via HKDF key derivation (Element Call compatible).
|
|
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
|
|
import asyncio
|
|
import base64
|
|
import datetime
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
|
|
import zoneinfo
|
|
|
|
import json
|
|
import re
|
|
|
|
import aiohttp
|
|
import httpx
|
|
from livekit import rtc, api as lkapi
|
|
from livekit.agents import Agent, AgentSession, StopResponse, function_tool, room_io, llm
|
|
from livekit.plugins import openai as lk_openai, elevenlabs, silero
|
|
from openai import AsyncOpenAI
|
|
|
|
logger = logging.getLogger("matrix-ai-voice")
|
|
|
|
|
|
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
|
|
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
|
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
|
|
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
|
|
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
|
|
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
|
|
MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service:8090")
|
|
DEFAULT_VOICE_ID = "ML23UVoFL5mI6APbRAeR" # Robert Ranger - Cool Storyteller, native German
|
|
|
|
_VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
|
|
Aktuelle Zeit: {datetime}
|
|
|
|
STRIKTE Regeln:
|
|
- Antworte in der Sprache des Nutzers (Deutsch oder Englisch)
|
|
- Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze
|
|
- Sei direkt und praezise, keine Fuellwoerter
|
|
- Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie
|
|
- Beantworte nur was gefragt wird
|
|
- Wenn niemand etwas fragt, sage nur kurz Hallo
|
|
- Schreibe Zahlen und Jahreszahlen IMMER als Woerter aus (z.B. "zweitausendundzwanzig" statt "2026", "zweiundzwanzigsten Februar" statt "22. Februar")
|
|
- IGNORIERE alle Texte in Sternchen wie *Störgeräusche*, *Schlechte Qualität*, *Fernsehgeräusche*, *Schrei* usw. — das sind KEINE echten Nutzereingaben sondern technische Annotationen. Antworte NIEMALS darauf und tue so als haette niemand etwas gesagt."""
|
|
|
|
|
|
def _build_voice_prompt() -> str:
|
|
tz_name = os.environ.get("VOICE_TIMEZONE", "Europe/Berlin")
|
|
try:
|
|
tz = zoneinfo.ZoneInfo(tz_name)
|
|
except Exception:
|
|
tz = datetime.timezone.utc
|
|
now = datetime.datetime.now(tz)
|
|
return _VOICE_PROMPT_TEMPLATE.format(
|
|
datetime=now.strftime("%A, %d. %B %Y %H:%M %Z")
|
|
)
|
|
|
|
# ElevenLabs scribe_v2_realtime produces two kinds of artifacts:
|
|
# 1. Noise annotations: *Störgeräusche*, *Schlechte Qualität*, etc.
|
|
# 2. Subtitle/metadata leaks: "Untertitel: ARD Text im Auftrag von Funk (2017)"
|
|
# Filter both via on_user_turn_completed (downstream of VAD+STT, no pipeline impact).
|
|
_NOISE_ANNOTATION_RE = re.compile(r'^\s*\*[^*]+\*\s*$')
|
|
_STT_ARTIFACT_PATTERNS = [
|
|
re.compile(r'(?i)^untertitel\b'), # subtitle metadata
|
|
re.compile(r'(?i)^copyright\b'), # copyright notices
|
|
re.compile(r'(?i)^musik\s*$'), # bare "Musik" annotation
|
|
re.compile(r'(?i)^\(.*\)\s*$'), # parenthetical annotations like (Applaus)
|
|
re.compile(r'(?i)^\[.*\]\s*$'), # bracketed annotations like [Musik]
|
|
re.compile(r'(?i)^(background noise|static|silence|applause|laughter|noise)\s*$'), # English variants
|
|
re.compile(r'^\s*\*\*[^*]+\*\*\s*$'), # double-asterisk: **Noise**
|
|
]
|
|
|
|
|
|
def _is_stt_artifact(text: str) -> bool:
|
|
"""Check if text is an STT artifact (noise annotation or metadata leak)."""
|
|
if _NOISE_ANNOTATION_RE.match(text):
|
|
return True
|
|
return any(p.match(text) for p in _STT_ARTIFACT_PATTERNS)
|
|
|
|
|
|
class _NoiseFilterAgent(Agent):
|
|
"""Agent that suppresses ElevenLabs STT artifacts before LLM sees them.
|
|
|
|
Uses on_user_turn_completed() which runs after VAD+STT, so no backpressure
|
|
risk to the audio pipeline. Raises StopResponse to silently discard noise.
|
|
"""
|
|
|
|
async def on_user_turn_completed(
|
|
self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
|
|
) -> None:
|
|
text = (new_message.text_content or "").strip()
|
|
if text and _is_stt_artifact(text):
|
|
logger.info("STT artifact suppressed: %s", text)
|
|
# Remove the artifact from context so it doesn't accumulate
|
|
if turn_ctx.items and turn_ctx.items[-1] is new_message:
|
|
turn_ctx.items.pop()
|
|
raise StopResponse()
|
|
|
|
|
|
_vad = None
|
|
def _get_vad():
|
|
global _vad
|
|
if _vad is None:
|
|
_vad = silero.VAD.load(
|
|
activation_threshold=0.50,
|
|
min_speech_duration=0.2,
|
|
min_silence_duration=0.55,
|
|
)
|
|
return _vad
|
|
|
|
def _make_lk_identity(user_id, device_id):
|
|
return f"{user_id}:{device_id}"
|
|
|
|
def _compute_lk_room_name(room_id):
|
|
raw = f"{room_id}|m.call#ROOM"
|
|
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
|
|
|
|
def _generate_lk_jwt(room_id, user_id, device_id):
|
|
identity = _make_lk_identity(user_id, device_id)
|
|
lk_room = _compute_lk_room_name(room_id)
|
|
token = (
|
|
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
|
|
.with_identity(identity)
|
|
.with_name(user_id)
|
|
.with_grants(lkapi.VideoGrants(
|
|
room_join=True, room=lk_room,
|
|
can_publish=True, can_subscribe=True))
|
|
.with_ttl(datetime.timedelta(hours=24))
|
|
)
|
|
logger.info("JWT: identity=%s room=%s", identity, lk_room)
|
|
return token.to_jwt()
|
|
|
|
|
|
KDF_HKDF = 1 # Rust FFI applies HKDF internally (proto enum value 1)
|
|
# NOTE: value 0 in the proto is PBKDF2, NOT raw/none mode — see e2ee_patch.py
|
|
|
|
|
|
def _hkdf_derive(ikm: bytes) -> bytes:
|
|
"""Pre-derive AES key via HKDF-SHA256 matching livekit-client-sdk-js deriveEncryptionKey().
|
|
|
|
JS params: hash=SHA-256, salt=encode("LKFrameEncryptionKey"), info=ArrayBuffer(128), length=128bit
|
|
We set this pre-derived key via set_shared_key() which bypasses Rust FFI KDF entirely.
|
|
"""
|
|
import hmac
|
|
salt = b"LKFrameEncryptionKey"
|
|
info = b"\x00" * 128
|
|
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
|
|
t1 = hmac.new(prk, info + b"\x01", hashlib.sha256).digest()
|
|
return t1[:16]
|
|
|
|
|
|
def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]:
|
|
"""Pre-compute ratcheted keys 0..count-1 matching livekit-client-sdk-js ratchet().
|
|
|
|
EC JS ratchet: new_raw = HMAC(key=current_raw, data="LKFrameEncryptionKey")[:16]
|
|
Returns {index: raw_key} for all indices 0..count-1.
|
|
Set these via set_key(identity, raw, index) with KDF_HKDF=1 so Rust applies HKDF.
|
|
"""
|
|
import hmac as _hmac
|
|
keys = {}
|
|
raw = base_raw
|
|
for i in range(count):
|
|
keys[i] = raw
|
|
raw = _hmac.new(raw, b"LKFrameEncryptionKey", hashlib.sha256).digest()[:16]
|
|
return keys
|
|
|
|
|
|
async def _brave_search(query: str, count: int = 5) -> str:
|
|
"""Call Brave Search API and return formatted results."""
|
|
if not BRAVE_API_KEY:
|
|
return "Search unavailable (no API key configured)."
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(
|
|
"https://api.search.brave.com/res/v1/web/search",
|
|
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
|
|
params={"q": query, "count": count, "text_decorations": False},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
results = data.get("web", {}).get("results", [])
|
|
if not results:
|
|
return "No results found."
|
|
lines = []
|
|
for r in results[:count]:
|
|
lines.append(f"- {r.get('title', '')}: {r.get('description', '')} ({r.get('url', '')})")
|
|
return "\n".join(lines)
|
|
except Exception as exc:
|
|
logger.warning("Brave search error: %s", exc)
|
|
return f"Search failed: {exc}"
|
|
|
|
|
|
async def _extract_voice_memories(user_text: str, agent_text: str,
|
|
user_id: str, room_id: str) -> None:
|
|
"""Extract memorable facts from a voice exchange and store them."""
|
|
if not LITELLM_URL or not MEMORY_SERVICE_URL:
|
|
return
|
|
try:
|
|
# Fetch existing facts to avoid duplicates
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.post(
|
|
f"{MEMORY_SERVICE_URL}/memories/query",
|
|
json={"user_id": user_id, "query": "all facts", "top_k": 20},
|
|
)
|
|
existing = [m["fact"] for m in resp.json().get("results", [])] if resp.is_success else []
|
|
|
|
existing_text = "\n".join(f"- {f}" for f in existing) if existing else "(none)"
|
|
llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY)
|
|
resp = await llm.chat.completions.create(
|
|
model="claude-haiku",
|
|
messages=[
|
|
{"role": "system", "content": (
|
|
"Extract memorable facts about the user from this voice conversation snippet. "
|
|
"Return a JSON array of concise strings. Include: name, preferences, location, "
|
|
"occupation, interests, family, projects. Skip duplicate or temporary info. "
|
|
"Return [] if nothing new."
|
|
)},
|
|
{"role": "user", "content": (
|
|
f"Existing memories:\n{existing_text}\n\n"
|
|
f"User said: {user_text}\nAssistant replied: {agent_text}\n\n"
|
|
"New facts (JSON array):"
|
|
)},
|
|
],
|
|
max_tokens=200,
|
|
)
|
|
raw = resp.choices[0].message.content.strip()
|
|
if raw.startswith("```"):
|
|
raw = re.sub(r"^```\w*\n?", "", raw)
|
|
raw = re.sub(r"\n?```$", "", raw)
|
|
match = re.search(r"\[.*\]", raw, re.DOTALL)
|
|
if match:
|
|
raw = match.group(0)
|
|
new_facts = json.loads(raw)
|
|
if not isinstance(new_facts, list):
|
|
return
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
for fact in new_facts:
|
|
if isinstance(fact, str) and fact.strip():
|
|
await client.post(
|
|
f"{MEMORY_SERVICE_URL}/memories/store",
|
|
json={"user_id": user_id, "fact": fact.strip(), "source_room": room_id},
|
|
)
|
|
logger.info("Memory stored for %s: %s", user_id, fact[:80])
|
|
except Exception as exc:
|
|
logger.warning("Voice memory extraction failed: %s", exc)
|
|
|
|
|
|
def _build_e2ee_options() -> rtc.E2EEOptions:
|
|
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
|
|
|
|
Pass raw base keys from Matrix key exchange events directly to set_key().
|
|
The Rust FFI derives the AES frame key via HKDF(base_key, ratchetSalt, ...) internally.
|
|
Element Call uses: ratchetWindowSize=10, keyringSize=256, ratchetSalt="LKFrameEncryptionKey"
|
|
NOTE: proto value 0 = PBKDF2 (not raw/none) — must use KDF_HKDF=1.
|
|
"""
|
|
key_opts = rtc.KeyProviderOptions(
|
|
shared_key=b"", # empty = per-participant mode
|
|
ratchet_window_size=10,
|
|
ratchet_salt=b"LKFrameEncryptionKey",
|
|
failure_tolerance=10,
|
|
key_ring_size=256,
|
|
key_derivation_function=KDF_HKDF, # Rust FFI applies HKDF; we pass raw base keys
|
|
)
|
|
return rtc.E2EEOptions(
|
|
encryption_type=rtc.EncryptionType.GCM,
|
|
key_provider_options=key_opts,
|
|
)
|
|
|
|
|
|
class VoiceSession:
|
|
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
|
|
publish_key_cb=None, bot_key: bytes | None = None,
|
|
memory=None, caller_user_id: str | None = None):
|
|
self.nio_client = nio_client
|
|
self.room_id = room_id
|
|
self.device_id = device_id
|
|
self.lk_url = lk_url
|
|
self.model = model
|
|
self.lk_room = None
|
|
self.session = None
|
|
self._task = None
|
|
self._http_session = None
|
|
self._caller_key: bytes | None = None
|
|
self._caller_identity: str | None = None
|
|
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
|
|
self._bot_key: bytes = bot_key or os.urandom(16)
|
|
self._publish_key_cb = publish_key_cb
|
|
self._memory = memory # MemoryClient instance from bot.py
|
|
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
|
|
|
|
def on_encryption_key(self, sender, device_id, key, index):
|
|
"""Receive E2EE key from Element Call participant.
|
|
|
|
Store-only: keys are applied in on_track_subscribed() / on_e2ee_state_changed()
|
|
where the frame cryptor is guaranteed to exist. If the track is already
|
|
subscribed (late key arrival / rotation), set the key immediately.
|
|
"""
|
|
if not key:
|
|
return
|
|
if not self._caller_key:
|
|
self._caller_key = key
|
|
self._caller_identity = f"{sender}:{device_id}"
|
|
self._caller_all_keys[index] = key
|
|
logger.info("E2EE key received from %s:%s (index=%d, %d bytes, raw=%s)",
|
|
sender, device_id, index, len(key), key.hex())
|
|
# Late key arrival: if track already subscribed, frame cryptor exists — set key now.
|
|
if self.lk_room and self._caller_identity:
|
|
caller_lk_id = self._caller_identity
|
|
for p in self.lk_room.remote_participants.values():
|
|
if p.identity == caller_lk_id:
|
|
has_subscribed = any(
|
|
pub.subscribed for pub in p.track_publications.values()
|
|
)
|
|
if has_subscribed:
|
|
try:
|
|
kp = self.lk_room.e2ee_manager.key_provider
|
|
ok = kp.set_key(p.identity, key, index)
|
|
logger.info("Late key set_key[%d] for %s (ok=%s)",
|
|
index, p.identity, ok)
|
|
except Exception as e:
|
|
logger.warning("Late key set_key failed: %s", e)
|
|
break
|
|
|
|
async def _fetch_encryption_key_http(self) -> bytes | None:
|
|
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
|
|
|
|
Element Call distributes encryption keys as timeline events, not state.
|
|
"""
|
|
import httpx
|
|
homeserver = str(self.nio_client.homeserver)
|
|
token = self.nio_client.access_token
|
|
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as http:
|
|
resp = await http.get(
|
|
url,
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
params={"dir": "b", "limit": "50"},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
events = data.get("chunk", [])
|
|
user_id = self.nio_client.user_id
|
|
for evt in events:
|
|
evt_type = evt.get("type", "")
|
|
if evt_type == "io.element.call.encryption_keys":
|
|
sender = evt.get("sender", "")
|
|
if sender == user_id:
|
|
continue # skip our own key
|
|
content = evt.get("content", {})
|
|
device = content.get("device_id", "")
|
|
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
|
|
sender, device)
|
|
all_keys = {}
|
|
import base64 as b64
|
|
for k in content.get("keys", []):
|
|
key_b64 = k.get("key", "")
|
|
key_index = k.get("index", 0)
|
|
if key_b64:
|
|
key_b64 += "=" * (-len(key_b64) % 4)
|
|
key_bytes = b64.urlsafe_b64decode(key_b64)
|
|
all_keys[key_index] = key_bytes
|
|
if all_keys:
|
|
if device:
|
|
self._caller_identity = f"{sender}:{device}"
|
|
self._caller_all_keys.update(all_keys)
|
|
max_idx = max(all_keys.keys())
|
|
logger.info("Loaded caller keys at indices %s (using %d)",
|
|
sorted(all_keys.keys()), max_idx)
|
|
return all_keys[max_idx]
|
|
logger.info("No encryption_keys events in last %d timeline events", len(events))
|
|
except Exception as e:
|
|
logger.warning("HTTP encryption key fetch failed: %s", e)
|
|
return None
|
|
|
|
async def start(self):
|
|
self._task = asyncio.create_task(self._run())
|
|
|
|
async def stop(self):
|
|
for obj in [self.session, self.lk_room, self._http_session]:
|
|
if obj:
|
|
try:
|
|
if hasattr(obj, "aclose"):
|
|
await obj.aclose()
|
|
elif hasattr(obj, "disconnect"):
|
|
await obj.disconnect()
|
|
elif hasattr(obj, "close"):
|
|
await obj.close()
|
|
except Exception:
|
|
pass
|
|
if self._task and not self._task.done():
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _run(self):
|
|
try:
|
|
user_id = self.nio_client.user_id
|
|
bot_identity = _make_lk_identity(user_id, self.device_id)
|
|
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
|
|
|
|
# Publish bot's own key immediately so Element Call can decrypt us
|
|
if self._publish_key_cb:
|
|
self._publish_key_cb(self._bot_key)
|
|
logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key))
|
|
|
|
# Check timeline for caller's encryption key
|
|
caller_key = await self._fetch_encryption_key_http()
|
|
if caller_key:
|
|
self._caller_key = caller_key
|
|
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
|
|
|
|
if not self._caller_key:
|
|
# Wait up to 15s for key via sync handler
|
|
logger.info("No key in timeline yet, waiting for sync...")
|
|
for _ in range(150):
|
|
if self._caller_key:
|
|
break
|
|
await asyncio.sleep(0.1)
|
|
|
|
# E2EE: configurable via E2EE_ENABLED env var (default true).
|
|
# When enabled, parameters MUST match Element Call JS SDK.
|
|
e2ee_enabled = os.environ.get("E2EE_ENABLED", "true").lower() in ("true", "1", "yes")
|
|
if e2ee_enabled:
|
|
e2ee_opts = _build_e2ee_options()
|
|
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
|
|
logger.info("E2EE enabled (HKDF mode)")
|
|
else:
|
|
e2ee_opts = None
|
|
room_opts = rtc.RoomOptions()
|
|
logger.warning("E2EE DISABLED — audio is unencrypted")
|
|
self.lk_room = rtc.Room()
|
|
|
|
@self.lk_room.on("participant_connected")
|
|
def on_p(p):
|
|
logger.info("Participant connected: %s", p.identity)
|
|
# Note: do NOT set keys here — frame cryptor not initialized yet at participant_connected.
|
|
# Keys are set in on_track_subscribed where the frame cryptor definitely exists.
|
|
|
|
@self.lk_room.on("track_published")
|
|
def on_tp(pub, p):
|
|
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
|
|
|
@self.lk_room.on("track_muted")
|
|
def on_mute(pub, p):
|
|
logger.info("Track MUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
|
|
|
@self.lk_room.on("track_unmuted")
|
|
def on_unmute(pub, p):
|
|
logger.info("Track UNMUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
|
|
|
@self.lk_room.on("track_subscribed")
|
|
def on_ts(t, pub, p):
|
|
logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted)
|
|
# NOTE: Do NOT create rtc.AudioStream here — it competes with AgentSession's
|
|
# internal audio pipeline for event loop time, causing intermittent VAD failures
|
|
# (user_state stuck on "away"). See MAT-40. Use e2ee_state_changed for flow confirmation.
|
|
# *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor
|
|
# for this participant already exists. The frame cryptor is created at track
|
|
# subscription time. Calling set_key() BEFORE track subscription (at connect)
|
|
# skips HKDF derivation → raw key stored → DEC_FAILED.
|
|
# Solution: set caller key HERE, after frame cryptor is initialized.
|
|
if int(t.kind) == 1 and e2ee_opts is not None: # audio track only
|
|
caller_id = p.identity
|
|
logger.info("E2EE_DIAG: track_subscribed for %s, have %d caller keys",
|
|
caller_id, len(self._caller_all_keys))
|
|
try:
|
|
kp_local = self.lk_room.e2ee_manager.key_provider
|
|
if self._caller_all_keys:
|
|
for idx, base_k in sorted(self._caller_all_keys.items()):
|
|
ok = kp_local.set_key(caller_id, base_k, idx)
|
|
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes, raw=%s)",
|
|
idx, caller_id, ok, len(base_k), base_k.hex())
|
|
else:
|
|
logger.warning("on_ts: no caller keys yet — scheduling 0.5s retry")
|
|
async def _brief_key_retry(pid=caller_id):
|
|
await asyncio.sleep(0.5)
|
|
if self._caller_all_keys:
|
|
try:
|
|
kp_r = self.lk_room.e2ee_manager.key_provider
|
|
for idx, base_k in sorted(self._caller_all_keys.items()):
|
|
ok = kp_r.set_key(pid, base_k, idx)
|
|
logger.info("on_ts_retry: set_key[%d] for %s (ok=%s)", idx, pid, ok)
|
|
except Exception as exc:
|
|
logger.warning("on_ts_retry: set_key failed: %s", exc)
|
|
else:
|
|
logger.warning("on_ts_retry: still no caller keys for %s", pid)
|
|
asyncio.ensure_future(_brief_key_retry())
|
|
except Exception as exc:
|
|
logger.warning("on_ts: set_key failed: %s", exc)
|
|
|
|
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
|
|
@self.lk_room.on("e2ee_state_changed")
|
|
def on_e2ee_state(participant, state):
|
|
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
|
|
p_id = participant.identity if participant else "local"
|
|
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
|
|
# When remote participant frame cryptor is NEW or MISSING_KEY → set their key
|
|
if participant and p_id != bot_identity and int(state) in (0, 3, 4):
|
|
if self._caller_all_keys:
|
|
try:
|
|
kp_e = self.lk_room.e2ee_manager.key_provider
|
|
for idx, base_k in sorted(self._caller_all_keys.items()):
|
|
ok = kp_e.set_key(p_id, base_k, idx)
|
|
logger.info("e2ee_state set_key[%d] for %s (ok=%s)", idx, p_id, ok)
|
|
except Exception as exc:
|
|
logger.warning("e2ee_state set_key failed: %s", exc)
|
|
|
|
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
|
|
logger.info("Connected (E2EE=HKDF), remote=%d",
|
|
len(self.lk_room.remote_participants))
|
|
|
|
# Set bot's own key immediately after connect — local frame cryptor exists at connect time.
|
|
# CALLER keys are set in on_track_subscribed (NOT here) because the caller's frame cryptor
|
|
# is only created when their track arrives. Calling set_key() before that skips HKDF.
|
|
kp = self.lk_room.e2ee_manager.key_provider
|
|
ok = kp.set_key(bot_identity, self._bot_key, 0)
|
|
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
|
|
|
|
# Element Call may rotate keys when bot joins — handled asynchronously by
|
|
# on_encryption_key() (stores keys) + on_track_subscribed() / on_e2ee_state_changed()
|
|
# (applies keys when frame cryptor exists). No blocking wait needed.
|
|
logger.info("E2EE key state: %d keys stored (max_idx=%d)",
|
|
len(self._caller_all_keys),
|
|
max(self._caller_all_keys.keys()) if self._caller_all_keys else -1)
|
|
|
|
# Find the remote participant, wait up to 10s if not yet connected
|
|
remote_identity = None
|
|
for p in self.lk_room.remote_participants.values():
|
|
remote_identity = p.identity
|
|
break
|
|
if not remote_identity:
|
|
logger.info("No remote participant yet, waiting...")
|
|
for _ in range(100):
|
|
await asyncio.sleep(0.1)
|
|
for p in self.lk_room.remote_participants.values():
|
|
remote_identity = p.identity
|
|
break
|
|
if remote_identity:
|
|
break
|
|
|
|
# Set shared_key with pre-derived AES key for caller decryption.
|
|
# NOT using set_key() for caller — Rust HKDF may produce different result than EC's JS HKDF.
|
|
# set_shared_key() stores key raw (no KDF applied) — we pre-derive in Python.
|
|
# After key rotation wait: if track already subscribed, set rotated key.
|
|
# (Usually on_track_subscribed handles this, but if track arrived before rotation,
|
|
# the rotated key needs to be set here for the already-subscribed participant.)
|
|
if self._caller_all_keys and remote_identity:
|
|
try:
|
|
for idx, base_k in sorted(self._caller_all_keys.items()):
|
|
ok = kp.set_key(remote_identity, base_k, idx)
|
|
logger.info("Post-rotation set_key[%d] for %s (ok=%s)",
|
|
idx, remote_identity, ok)
|
|
except Exception as e:
|
|
logger.warning("Post-rotation set_key failed: %s", e)
|
|
elif not self._caller_all_keys:
|
|
logger.warning("No caller E2EE keys received — incoming audio will be silence")
|
|
|
|
if remote_identity:
|
|
logger.info("Linking to remote participant: %s", remote_identity)
|
|
|
|
# Load memories for this caller
|
|
memory_section = ""
|
|
if self._memory and self._caller_user_id:
|
|
try:
|
|
mems = await self._memory.query(self._caller_user_id, "voice call", top_k=10)
|
|
if mems:
|
|
memory_section = "\n\nKontext aus früheren Gesprächen mit diesem Nutzer:\n" + \
|
|
"\n".join(f"- {m['fact']}" for m in mems)
|
|
logger.info("Loaded %d memories for %s", len(mems), self._caller_user_id)
|
|
except Exception as exc:
|
|
logger.warning("Memory query failed: %s", exc)
|
|
|
|
# Voice pipeline — Jack Marlowe (native German male)
|
|
self._http_session = aiohttp.ClientSession()
|
|
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
|
|
self.session = AgentSession(
|
|
stt=elevenlabs.STT(
|
|
api_key=ELEVENLABS_KEY,
|
|
model_id="scribe_v2_realtime",
|
|
language_code=os.environ.get("STT_LANGUAGE", "de"),
|
|
http_session=self._http_session,
|
|
),
|
|
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
|
|
tts=elevenlabs.TTS(
|
|
voice_id=voice_id,
|
|
model="eleven_multilingual_v2",
|
|
language="de",
|
|
api_key=ELEVENLABS_KEY,
|
|
voice_settings=elevenlabs.VoiceSettings(
|
|
stability=0.5, similarity_boost=0.75, speed=1.15,
|
|
),
|
|
http_session=self._http_session,
|
|
),
|
|
vad=_get_vad(),
|
|
)
|
|
|
|
# Pipeline event logging (livekit-agents 1.4.2 event names)
|
|
_vad_state_log: dict = {"last": None, "speaking_count": 0, "away_since": None}
|
|
|
|
@self.session.on("user_state_changed")
|
|
def _on_user_state(ev):
|
|
state = ev.new_state
|
|
prev = _vad_state_log["last"]
|
|
_vad_state_log["last"] = state
|
|
if str(state) == "speaking":
|
|
_vad_state_log["speaking_count"] = _vad_state_log.get("speaking_count", 0) + 1
|
|
_vad_state_log["away_since"] = None
|
|
logger.info("VAD: user_state=%s (speaking_count=%d)", state, _vad_state_log["speaking_count"])
|
|
elif str(state) == "away" and prev and str(prev) != "away":
|
|
import time
|
|
_vad_state_log["away_since"] = time.monotonic()
|
|
logger.info("VAD: user_state=%s (was %s)", state, prev)
|
|
else:
|
|
logger.info("VAD: user_state=%s", state)
|
|
|
|
_last_user_speech: list[str] = []
|
|
|
|
@self.session.on("user_input_transcribed")
|
|
def _on_user_speech(ev):
|
|
text = ev.transcript or ""
|
|
if text and _is_stt_artifact(text):
|
|
logger.warning("NOISE_LEAK: artifact reached STT: %s", text)
|
|
else:
|
|
logger.info("USER_SPEECH: %s", text)
|
|
if ev.transcript:
|
|
_last_user_speech.append(ev.transcript)
|
|
|
|
@self.session.on("conversation_item_added")
|
|
def _on_conversation_item(ev):
|
|
role = getattr(ev.item, "role", "?")
|
|
text = getattr(ev.item, "text_content", "") or ""
|
|
if role == "assistant" and text:
|
|
logger.info("AGENT_SPEECH: %s", text)
|
|
if self._memory and self._caller_user_id and _last_user_speech:
|
|
user_text = " ".join(_last_user_speech)
|
|
_last_user_speech.clear()
|
|
asyncio.ensure_future(
|
|
_extract_voice_memories(user_text, text,
|
|
self._caller_user_id, self.room_id))
|
|
|
|
# Brave Search tool — lets the agent answer questions about current events
|
|
@function_tool
|
|
async def search_web(query: str) -> str:
|
|
"""Search the web for current information using Brave Search.
|
|
|
|
Use this when asked about recent news, current events, prices,
|
|
weather, or any information that may have changed recently.
|
|
"""
|
|
logger.info("SEARCH: %s", query)
|
|
result = await _brave_search(query)
|
|
logger.info("SEARCH_RESULT: %s", result[:200])
|
|
return result
|
|
|
|
agent = _NoiseFilterAgent(
|
|
instructions=_build_voice_prompt() + memory_section,
|
|
tools=[search_web],
|
|
)
|
|
io_opts = room_io.RoomOptions(
|
|
participant_identity=remote_identity,
|
|
close_on_disconnect=False,
|
|
) if remote_identity else room_io.RoomOptions(close_on_disconnect=False)
|
|
await self.session.start(
|
|
agent=agent,
|
|
room=self.lk_room,
|
|
room_options=io_opts,
|
|
)
|
|
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
|
|
|
|
try:
|
|
await asyncio.wait_for(
|
|
self.session.generate_reply(
|
|
instructions="Sage nur: Hallo, wie kann ich helfen?"),
|
|
timeout=30.0)
|
|
logger.info("Greeting sent")
|
|
except asyncio.TimeoutError:
|
|
logger.error("Greeting timed out")
|
|
|
|
# VAD watchdog: log diagnostic and attempt E2EE key recovery if stuck
|
|
import time as _time
|
|
while True:
|
|
await asyncio.sleep(10)
|
|
away_since = _vad_state_log.get("away_since")
|
|
if away_since and (_time.monotonic() - away_since) > 30:
|
|
sc = _vad_state_log.get("speaking_count", 0)
|
|
e2ee_ok = any(
|
|
str(getattr(p, '_e2ee_state', '')) == 'OK'
|
|
for p in self.lk_room.remote_participants.values()
|
|
) if self.lk_room else False
|
|
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
|
|
logger.warning(
|
|
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
|
|
"remote_participants=%d, e2ee_ok=%s) — attempting E2EE key recovery",
|
|
sc, n_remote, e2ee_ok,
|
|
)
|
|
# Recovery: re-apply all stored E2EE keys for all remote participants
|
|
if self.lk_room and self._caller_all_keys:
|
|
try:
|
|
kp_w = self.lk_room.e2ee_manager.key_provider
|
|
for p in self.lk_room.remote_participants.values():
|
|
for idx, base_k in sorted(self._caller_all_keys.items()):
|
|
ok = kp_w.set_key(p.identity, base_k, idx)
|
|
logger.info("VAD_WATCHDOG: recovery set_key[%d] for %s (ok=%s)",
|
|
idx, p.identity, ok)
|
|
except Exception as exc:
|
|
logger.warning("VAD_WATCHDOG: recovery set_key failed: %s", exc)
|
|
_vad_state_log["away_since"] = None # only warn once per stuck period
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Session cancelled for %s", self.room_id)
|
|
except Exception:
|
|
logger.exception("Session failed for %s", self.room_id)
|