"bist du dir sicher" / "are you sure" / "stimmt das wirklich" now also trigger Opus escalation for fact-checking the previous answer. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
989 lines
48 KiB
Python
989 lines
48 KiB
Python
"""Voice session: LiveKit + STT/LLM/TTS pipeline.
|
||
E2EE via HKDF key derivation (Element Call compatible).
|
||
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
|
||
import asyncio
|
||
import base64
|
||
import datetime
|
||
import hashlib
|
||
import logging
|
||
import os
|
||
|
||
import zoneinfo
|
||
|
||
import json
|
||
import re
|
||
|
||
import aiohttp
|
||
import httpx
|
||
from livekit import rtc, api as lkapi
|
||
from livekit.agents import Agent, AgentSession, StopResponse, function_tool, room_io, llm
|
||
from livekit.plugins import openai as lk_openai, elevenlabs, silero
|
||
from openai import AsyncOpenAI
|
||
|
||
logger = logging.getLogger("matrix-ai-voice")
|
||
|
||
|
||
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
|
||
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
||
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
|
||
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
|
||
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
|
||
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
|
||
MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service:8090")
|
||
CONFLUENCE_URL = os.environ.get("CONFLUENCE_BASE_URL", "")
|
||
CONFLUENCE_USER = os.environ.get("CONFLUENCE_USER", "")
|
||
CONFLUENCE_TOKEN = os.environ.get("CONFLUENCE_TOKEN", "")
|
||
DEFAULT_VOICE_ID = "ML23UVoFL5mI6APbRAeR" # Robert Ranger - Cool Storyteller, native German
|
||
|
||
_VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
|
||
Aktuelle Zeit: {datetime}
|
||
Zeitzone des Nutzers: {timezone}
|
||
LLM-Modell: {model}
|
||
|
||
STRIKTE Regeln:
|
||
- Antworte in der Sprache des Nutzers (Deutsch oder Englisch)
|
||
- Halte Antworten normalerweise auf 1-2 kurze Saetze. AUSNAHME: Wenn ein Dokument geladen ist und der Nutzer Rollenspiele, Probegespraeche, Mock-Interviews oder ausfuehrliche Besprechungen wuenscht, antworte so ausfuehrlich wie noetig
|
||
- Sei direkt und praezise, keine Fuellwoerter
|
||
- Erfinde NICHTS ausser der Nutzer bittet explizit um Rollenspiel, Probegespraech oder Simulation basierend auf dem Dokumentinhalt. In dem Fall spiele die Rolle ueberzeugend und nutze den Dokumentinhalt als Grundlage
|
||
- Beantworte nur was gefragt wird
|
||
- Wenn niemand etwas fragt oder du dir nicht sicher bist ob jemand mit dir spricht, SCHWEIGE. Antworte NUR auf klare, direkte Fragen oder Anweisungen. Kein Smalltalk, kein "Danke", kein "Wie kann ich helfen" von dir aus
|
||
- Schreibe Zahlen und Jahreszahlen IMMER als Woerter aus (z.B. "zweitausendundzwanzig" statt "2026", "zweiundzwanzigsten Februar" statt "22. Februar")
|
||
- Bei zeitrelevanten Fragen (Uhrzeit, Termine, Geschaeftszeiten): frage kurz nach ob der Nutzer noch in seiner gespeicherten Zeitzone ist, bevor du antwortest. Nutze set_user_timezone wenn sich der Standort geaendert hat.
|
||
- Wenn der Nutzer seinen Standort oder seine Stadt erwaehnt, nutze set_user_timezone um die Zeitzone zu speichern.
|
||
- IGNORIERE alle Texte in Sternchen wie *Störgeräusche*, *Schlechte Qualität*, *Fernsehgeräusche*, *Schrei* usw. — das sind KEINE echten Nutzereingaben sondern technische Annotationen. Antworte NIEMALS darauf und tue so als haette niemand etwas gesagt.
|
||
- Du kannst Confluence-Seiten lesen und bearbeiten. Nutze read_confluence_page und update_confluence_page wenn der Nutzer Dokumente besprechen oder aendern moechte."""
|
||
|
||
|
||
def _build_voice_prompt(model: str = "claude-sonnet",
|
||
timezone: str | None = None) -> str:
|
||
tz_name = timezone or os.environ.get("VOICE_TIMEZONE", "Europe/Berlin")
|
||
try:
|
||
tz = zoneinfo.ZoneInfo(tz_name)
|
||
except Exception:
|
||
tz = datetime.timezone.utc
|
||
now = datetime.datetime.now(tz)
|
||
return _VOICE_PROMPT_TEMPLATE.format(
|
||
datetime=now.strftime("%A, %d. %B %Y %H:%M %Z"),
|
||
timezone=tz_name,
|
||
model=model,
|
||
)
|
||
|
||
# ElevenLabs scribe_v2_realtime produces two kinds of artifacts:
|
||
# 1. Noise annotations: *Störgeräusche*, *Schlechte Qualität*, etc.
|
||
# 2. Subtitle/metadata leaks: "Untertitel: ARD Text im Auftrag von Funk (2017)"
|
||
# Filter both via on_user_turn_completed (downstream of VAD+STT, no pipeline impact).
|
||
_NOISE_ANNOTATION_RE = re.compile(r'^\s*\*[^*]+\*\s*$')
|
||
_STT_ARTIFACT_PATTERNS = [
|
||
re.compile(r'(?i)^untertitel\b'), # subtitle metadata
|
||
re.compile(r'(?i)^copyright\b'), # copyright notices
|
||
re.compile(r'(?i)^musik\s*$'), # bare "Musik" annotation
|
||
re.compile(r'(?i)^\(.*\)\s*$'), # parenthetical annotations like (Applaus)
|
||
re.compile(r'(?i)^\[.*\]\s*$'), # bracketed annotations like [Musik]
|
||
re.compile(r'(?i)^(background noise|static|silence|applause|laughter|noise)\s*$'), # English variants
|
||
re.compile(r'^\s*\*\*[^*]+\*\*\s*$'), # double-asterisk: **Noise**
|
||
]
|
||
|
||
|
||
_STT_GHOST_PHRASES = {
|
||
"danke", "ja", "nein", "okay", "ok", "tschüss", "hallo", "bye",
|
||
"mhm", "hmm", "aha", "oh", "ah", "ähm", "hm", "tschüss",
|
||
"thanks", "thank you", "yes", "no", "bye bye", "hello", "hi",
|
||
"bitte", "genau", "gut", "so", "na", "ne", "ach", "doch",
|
||
"vielen dank", "alles klar", "schön", "super", "richtig",
|
||
}
|
||
|
||
|
||
def _is_stt_artifact(text: str) -> bool:
|
||
"""Check if text is an STT artifact (noise annotation, metadata leak, or ghost phrase)."""
|
||
if _NOISE_ANNOTATION_RE.match(text):
|
||
return True
|
||
if any(p.match(text) for p in _STT_ARTIFACT_PATTERNS):
|
||
return True
|
||
# Short phantom transcriptions from ambient noise — ElevenLabs hallucinates
|
||
# common German/English filler words when it hears background sounds
|
||
words = text.split()
|
||
if len(words) <= 2 and text.lower().strip().rstrip(".!?,") in _STT_GHOST_PHRASES:
|
||
return True
|
||
return False
|
||
|
||
|
||
class _NoiseFilterAgent(Agent):
|
||
"""Agent that suppresses ElevenLabs STT artifacts before LLM sees them.
|
||
|
||
Uses on_user_turn_completed() which runs after VAD+STT, so no backpressure
|
||
risk to the audio pipeline. Raises StopResponse to silently discard noise.
|
||
"""
|
||
|
||
async def on_user_turn_completed(
|
||
self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
|
||
) -> None:
|
||
text = (new_message.text_content or "").strip()
|
||
if text and _is_stt_artifact(text):
|
||
logger.info("STT artifact suppressed: %s", text)
|
||
# Remove the artifact from context so it doesn't accumulate
|
||
if turn_ctx.items and turn_ctx.items[-1] is new_message:
|
||
turn_ctx.items.pop()
|
||
raise StopResponse()
|
||
|
||
|
||
_vad = None
|
||
def _get_vad():
|
||
global _vad
|
||
if _vad is None:
|
||
_vad = silero.VAD.load(
|
||
activation_threshold=0.65,
|
||
min_speech_duration=0.4,
|
||
min_silence_duration=0.55,
|
||
)
|
||
return _vad
|
||
|
||
def _make_lk_identity(user_id, device_id):
|
||
return f"{user_id}:{device_id}"
|
||
|
||
def _compute_lk_room_name(room_id):
|
||
raw = f"{room_id}|m.call#ROOM"
|
||
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
|
||
|
||
def _generate_lk_jwt(room_id, user_id, device_id):
|
||
identity = _make_lk_identity(user_id, device_id)
|
||
lk_room = _compute_lk_room_name(room_id)
|
||
token = (
|
||
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
|
||
.with_identity(identity)
|
||
.with_name(user_id)
|
||
.with_grants(lkapi.VideoGrants(
|
||
room_join=True, room=lk_room,
|
||
can_publish=True, can_subscribe=True))
|
||
.with_ttl(datetime.timedelta(hours=24))
|
||
)
|
||
logger.info("JWT: identity=%s room=%s", identity, lk_room)
|
||
return token.to_jwt()
|
||
|
||
|
||
KDF_HKDF = 1 # Rust FFI applies HKDF internally (proto enum value 1)
|
||
# NOTE: value 0 in the proto is PBKDF2, NOT raw/none mode — see e2ee_patch.py
|
||
|
||
|
||
def _hkdf_derive(ikm: bytes) -> bytes:
|
||
"""Pre-derive AES key via HKDF-SHA256 matching livekit-client-sdk-js deriveEncryptionKey().
|
||
|
||
JS params: hash=SHA-256, salt=encode("LKFrameEncryptionKey"), info=ArrayBuffer(128), length=128bit
|
||
We set this pre-derived key via set_shared_key() which bypasses Rust FFI KDF entirely.
|
||
"""
|
||
import hmac
|
||
salt = b"LKFrameEncryptionKey"
|
||
info = b"\x00" * 128
|
||
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
|
||
t1 = hmac.new(prk, info + b"\x01", hashlib.sha256).digest()
|
||
return t1[:16]
|
||
|
||
|
||
def _ratchet_keys(base_raw: bytes, count: int = 6) -> dict[int, bytes]:
|
||
"""Pre-compute ratcheted keys 0..count-1 matching livekit-client-sdk-js ratchet().
|
||
|
||
EC JS ratchet: new_raw = HMAC(key=current_raw, data="LKFrameEncryptionKey")[:16]
|
||
Returns {index: raw_key} for all indices 0..count-1.
|
||
Set these via set_key(identity, raw, index) with KDF_HKDF=1 so Rust applies HKDF.
|
||
"""
|
||
import hmac as _hmac
|
||
keys = {}
|
||
raw = base_raw
|
||
for i in range(count):
|
||
keys[i] = raw
|
||
raw = _hmac.new(raw, b"LKFrameEncryptionKey", hashlib.sha256).digest()[:16]
|
||
return keys
|
||
|
||
|
||
async def _brave_search(query: str, count: int = 5) -> str:
|
||
"""Call Brave Search API and return formatted results."""
|
||
if not BRAVE_API_KEY:
|
||
return "Search unavailable (no API key configured)."
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
resp = await client.get(
|
||
"https://api.search.brave.com/res/v1/web/search",
|
||
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
|
||
params={"q": query, "count": count, "text_decorations": False},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
results = data.get("web", {}).get("results", [])
|
||
if not results:
|
||
return "No results found."
|
||
lines = []
|
||
for r in results[:count]:
|
||
lines.append(f"- {r.get('title', '')}: {r.get('description', '')} ({r.get('url', '')})")
|
||
return "\n".join(lines)
|
||
except Exception as exc:
|
||
logger.warning("Brave search error: %s", exc)
|
||
return f"Search failed: {exc}"
|
||
|
||
|
||
async def _store_user_pref(user_id: str, key: str, value: str) -> None:
|
||
"""Store a user preference in memory (e.g. timezone, language)."""
|
||
if not MEMORY_SERVICE_URL:
|
||
return
|
||
fact = f"[PREF:{key}] {value}"
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
await client.post(
|
||
f"{MEMORY_SERVICE_URL}/memories/store",
|
||
json={"user_id": user_id, "fact": fact, "source_room": "preferences"},
|
||
)
|
||
logger.info("Preference stored for %s: %s=%s", user_id, key, value)
|
||
except Exception as exc:
|
||
logger.warning("Preference store failed: %s", exc)
|
||
|
||
|
||
async def _store_voice_exchange(user_text: str, agent_text: str,
|
||
user_id: str, room_id: str) -> None:
|
||
"""Store the full conversation exchange as memory (no LLM extraction)."""
|
||
if not MEMORY_SERVICE_URL:
|
||
return
|
||
exchange = f"User: {user_text}\nAssistant: {agent_text}"
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
await client.post(
|
||
f"{MEMORY_SERVICE_URL}/memories/store",
|
||
json={"user_id": user_id, "fact": exchange, "source_room": room_id},
|
||
)
|
||
logger.info("Memory stored for %s: %s", user_id, exchange[:120])
|
||
except Exception as exc:
|
||
logger.warning("Voice memory store failed: %s", exc)
|
||
|
||
|
||
async def _confluence_read_page(page_id: str) -> tuple[str, str, int]:
|
||
"""Read a Confluence page and return (title, plain_text, version_number)."""
|
||
if not CONFLUENCE_URL or not CONFLUENCE_USER or not CONFLUENCE_TOKEN:
|
||
raise RuntimeError("Confluence credentials not configured")
|
||
url = f"{CONFLUENCE_URL}/rest/api/content/{page_id}"
|
||
params = {"expand": "body.storage,version,title"}
|
||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||
resp = await client.get(
|
||
url,
|
||
params=params,
|
||
auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN),
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
title = data.get("title", "")
|
||
version = data.get("version", {}).get("number", 1)
|
||
html = data.get("body", {}).get("storage", {}).get("value", "")
|
||
# Strip HTML tags to plain text for voice readback
|
||
plain = re.sub(r"<[^>]+>", " ", html)
|
||
plain = re.sub(r"\s+", " ", plain).strip()
|
||
return title, plain, version
|
||
|
||
|
||
async def _confluence_update_section(page_id: str, section_heading: str, new_html: str) -> str:
|
||
"""Update a section of a Confluence page by heading.
|
||
|
||
Finds the section by heading, replaces content up to next same-level heading,
|
||
PUTs with incremented version.
|
||
"""
|
||
if not CONFLUENCE_URL or not CONFLUENCE_USER or not CONFLUENCE_TOKEN:
|
||
return "Confluence credentials not configured."
|
||
# Read current page
|
||
url = f"{CONFLUENCE_URL}/rest/api/content/{page_id}"
|
||
params = {"expand": "body.storage,version,title"}
|
||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||
resp = await client.get(url, params=params, auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN))
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
|
||
title = data["title"]
|
||
version = data["version"]["number"]
|
||
body_html = data["body"]["storage"]["value"]
|
||
|
||
# Find section by heading (h1-h6) and replace content up to next same-level heading
|
||
heading_pattern = re.compile(
|
||
r'(<h([1-6])[^>]*>.*?' + re.escape(section_heading) + r'.*?</h\2>)',
|
||
re.IGNORECASE | re.DOTALL,
|
||
)
|
||
match = heading_pattern.search(body_html)
|
||
if not match:
|
||
return f"Section '{section_heading}' not found on page."
|
||
|
||
heading_tag = match.group(0)
|
||
heading_level = match.group(2)
|
||
section_start = match.end()
|
||
|
||
# Find next heading of same or higher level
|
||
next_heading = re.compile(
|
||
rf'<h[1-{heading_level}][^>]*>',
|
||
re.IGNORECASE,
|
||
)
|
||
next_match = next_heading.search(body_html, section_start)
|
||
section_end = next_match.start() if next_match else len(body_html)
|
||
|
||
# Replace section content
|
||
new_body = body_html[:section_start] + new_html + body_html[section_end:]
|
||
|
||
# PUT updated page
|
||
put_data = {
|
||
"version": {"number": version + 1},
|
||
"title": title,
|
||
"type": "page",
|
||
"body": {
|
||
"storage": {
|
||
"value": new_body,
|
||
"representation": "storage",
|
||
}
|
||
},
|
||
}
|
||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||
resp = await client.put(
|
||
url,
|
||
json=put_data,
|
||
auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN),
|
||
)
|
||
resp.raise_for_status()
|
||
return f"Section '{section_heading}' updated successfully."
|
||
|
||
|
||
def _build_e2ee_options() -> rtc.E2EEOptions:
|
||
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
|
||
|
||
Pass raw base keys from Matrix key exchange events directly to set_key().
|
||
The Rust FFI derives the AES frame key via HKDF(base_key, ratchetSalt, ...) internally.
|
||
Element Call uses: ratchetWindowSize=10, keyringSize=256, ratchetSalt="LKFrameEncryptionKey"
|
||
NOTE: proto value 0 = PBKDF2 (not raw/none) — must use KDF_HKDF=1.
|
||
"""
|
||
key_opts = rtc.KeyProviderOptions(
|
||
shared_key=b"", # empty = per-participant mode
|
||
ratchet_window_size=10,
|
||
ratchet_salt=b"LKFrameEncryptionKey",
|
||
failure_tolerance=10,
|
||
key_ring_size=256,
|
||
key_derivation_function=KDF_HKDF, # Rust FFI applies HKDF; we pass raw base keys
|
||
)
|
||
return rtc.E2EEOptions(
|
||
encryption_type=rtc.EncryptionType.GCM,
|
||
key_provider_options=key_opts,
|
||
)
|
||
|
||
|
||
class VoiceSession:
|
||
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
|
||
publish_key_cb=None, bot_key: bytes | None = None,
|
||
memory=None, caller_user_id: str | None = None,
|
||
document_context: str | None = None):
|
||
self.nio_client = nio_client
|
||
self.room_id = room_id
|
||
self.device_id = device_id
|
||
self.lk_url = lk_url
|
||
self.model = model
|
||
self.lk_room = None
|
||
self.session = None
|
||
self._task = None
|
||
self._http_session = None
|
||
self._caller_key: bytes | None = None
|
||
self._caller_identity: str | None = None
|
||
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
|
||
self._bot_key: bytes = bot_key or os.urandom(16)
|
||
self._publish_key_cb = publish_key_cb
|
||
self._memory = memory # MemoryClient instance from bot.py
|
||
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
|
||
self._document_context = document_context # PDF text from room for voice context
|
||
self._transcript: list[dict] = [] # {"role": "user"|"assistant", "text": "..."}
|
||
|
||
def on_encryption_key(self, sender, device_id, key, index):
|
||
"""Receive E2EE key from Element Call participant.
|
||
|
||
Store-only: keys are applied in on_track_subscribed() / on_e2ee_state_changed()
|
||
where the frame cryptor is guaranteed to exist. If the track is already
|
||
subscribed (late key arrival / rotation), set the key immediately.
|
||
"""
|
||
if not key:
|
||
return
|
||
if not self._caller_key:
|
||
self._caller_key = key
|
||
self._caller_identity = f"{sender}:{device_id}"
|
||
self._caller_all_keys[index] = key
|
||
logger.info("E2EE key received from %s:%s (index=%d, %d bytes, raw=%s)",
|
||
sender, device_id, index, len(key), key.hex())
|
||
# Late key arrival: if track already subscribed, frame cryptor exists — set key now.
|
||
if self.lk_room and self._caller_identity:
|
||
caller_lk_id = self._caller_identity
|
||
for p in self.lk_room.remote_participants.values():
|
||
if p.identity == caller_lk_id:
|
||
has_subscribed = any(
|
||
pub.subscribed for pub in p.track_publications.values()
|
||
)
|
||
if has_subscribed:
|
||
try:
|
||
kp = self.lk_room.e2ee_manager.key_provider
|
||
ok = kp.set_key(p.identity, key, index)
|
||
logger.info("Late key set_key[%d] for %s (ok=%s)",
|
||
index, p.identity, ok)
|
||
except Exception as e:
|
||
logger.warning("Late key set_key failed: %s", e)
|
||
break
|
||
|
||
async def _fetch_encryption_key_http(self) -> bytes | None:
|
||
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
|
||
|
||
Element Call distributes encryption keys as timeline events, not state.
|
||
"""
|
||
import httpx
|
||
homeserver = str(self.nio_client.homeserver)
|
||
token = self.nio_client.access_token
|
||
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as http:
|
||
resp = await http.get(
|
||
url,
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
params={"dir": "b", "limit": "50"},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
events = data.get("chunk", [])
|
||
user_id = self.nio_client.user_id
|
||
for evt in events:
|
||
evt_type = evt.get("type", "")
|
||
if evt_type == "io.element.call.encryption_keys":
|
||
sender = evt.get("sender", "")
|
||
if sender == user_id:
|
||
continue # skip our own key
|
||
content = evt.get("content", {})
|
||
device = content.get("device_id", "")
|
||
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
|
||
sender, device)
|
||
all_keys = {}
|
||
import base64 as b64
|
||
for k in content.get("keys", []):
|
||
key_b64 = k.get("key", "")
|
||
key_index = k.get("index", 0)
|
||
if key_b64:
|
||
key_b64 += "=" * (-len(key_b64) % 4)
|
||
key_bytes = b64.urlsafe_b64decode(key_b64)
|
||
all_keys[key_index] = key_bytes
|
||
if all_keys:
|
||
if device:
|
||
self._caller_identity = f"{sender}:{device}"
|
||
self._caller_all_keys.update(all_keys)
|
||
max_idx = max(all_keys.keys())
|
||
logger.info("Loaded caller keys at indices %s (using %d)",
|
||
sorted(all_keys.keys()), max_idx)
|
||
return all_keys[max_idx]
|
||
logger.info("No encryption_keys events in last %d timeline events", len(events))
|
||
except Exception as e:
|
||
logger.warning("HTTP encryption key fetch failed: %s", e)
|
||
return None
|
||
|
||
async def start(self):
|
||
self._task = asyncio.create_task(self._run())
|
||
|
||
async def stop(self):
|
||
for obj in [self.session, self.lk_room, self._http_session]:
|
||
if obj:
|
||
try:
|
||
if hasattr(obj, "aclose"):
|
||
await obj.aclose()
|
||
elif hasattr(obj, "disconnect"):
|
||
await obj.disconnect()
|
||
elif hasattr(obj, "close"):
|
||
await obj.close()
|
||
except Exception:
|
||
pass
|
||
if self._task and not self._task.done():
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
def get_transcript(self) -> list[dict]:
|
||
"""Return the call transcript as a list of {role, text} dicts."""
|
||
return list(self._transcript)
|
||
|
||
def get_document_context(self) -> str | None:
|
||
"""Return the document context loaded for this call, if any."""
|
||
return self._document_context
|
||
|
||
def get_confluence_page_id(self) -> str | None:
|
||
"""Return the active Confluence page ID, if any."""
|
||
if not self._document_context:
|
||
return None
|
||
ids = re.findall(r'confluence_page_id:(\d+)', self._document_context)
|
||
return ids[0] if ids else None
|
||
|
||
async def _run(self):
|
||
try:
|
||
user_id = self.nio_client.user_id
|
||
bot_identity = _make_lk_identity(user_id, self.device_id)
|
||
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
|
||
|
||
# Publish bot's own key immediately so Element Call can decrypt us
|
||
if self._publish_key_cb:
|
||
self._publish_key_cb(self._bot_key)
|
||
logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key))
|
||
|
||
# Check timeline for caller's encryption key
|
||
caller_key = await self._fetch_encryption_key_http()
|
||
if caller_key:
|
||
self._caller_key = caller_key
|
||
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
|
||
|
||
if not self._caller_key:
|
||
# Wait up to 15s for key via sync handler
|
||
logger.info("No key in timeline yet, waiting for sync...")
|
||
for _ in range(150):
|
||
if self._caller_key:
|
||
break
|
||
await asyncio.sleep(0.1)
|
||
|
||
# E2EE: configurable via E2EE_ENABLED env var (default true).
|
||
# When enabled, parameters MUST match Element Call JS SDK.
|
||
e2ee_enabled = os.environ.get("E2EE_ENABLED", "true").lower() in ("true", "1", "yes")
|
||
if e2ee_enabled:
|
||
e2ee_opts = _build_e2ee_options()
|
||
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
|
||
logger.info("E2EE enabled (HKDF mode)")
|
||
else:
|
||
e2ee_opts = None
|
||
room_opts = rtc.RoomOptions()
|
||
logger.warning("E2EE DISABLED — audio is unencrypted")
|
||
self.lk_room = rtc.Room()
|
||
|
||
@self.lk_room.on("participant_connected")
|
||
def on_p(p):
|
||
logger.info("Participant connected: %s", p.identity)
|
||
# Note: do NOT set keys here — frame cryptor not initialized yet at participant_connected.
|
||
# Keys are set in on_track_subscribed where the frame cryptor definitely exists.
|
||
|
||
@self.lk_room.on("track_published")
|
||
def on_tp(pub, p):
|
||
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
||
|
||
@self.lk_room.on("track_muted")
|
||
def on_mute(p, pub):
|
||
logger.info("Track MUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
||
|
||
@self.lk_room.on("track_unmuted")
|
||
def on_unmute(p, pub):
|
||
logger.info("Track UNMUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
||
|
||
@self.lk_room.on("track_subscribed")
|
||
def on_ts(t, pub, p):
|
||
logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted)
|
||
# NOTE: Do NOT create rtc.AudioStream here — it competes with AgentSession's
|
||
# internal audio pipeline for event loop time, causing intermittent VAD failures
|
||
# (user_state stuck on "away"). See MAT-40. Use e2ee_state_changed for flow confirmation.
|
||
# *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor
|
||
# for this participant already exists. The frame cryptor is created at track
|
||
# subscription time. Calling set_key() BEFORE track subscription (at connect)
|
||
# skips HKDF derivation → raw key stored → DEC_FAILED.
|
||
# Solution: set caller key HERE, after frame cryptor is initialized.
|
||
if int(t.kind) == 1 and e2ee_opts is not None: # audio track only
|
||
caller_id = p.identity
|
||
logger.info("E2EE_DIAG: track_subscribed for %s, have %d caller keys",
|
||
caller_id, len(self._caller_all_keys))
|
||
try:
|
||
kp_local = self.lk_room.e2ee_manager.key_provider
|
||
if self._caller_all_keys:
|
||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||
ok = kp_local.set_key(caller_id, base_k, idx)
|
||
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes, raw=%s)",
|
||
idx, caller_id, ok, len(base_k), base_k.hex())
|
||
else:
|
||
logger.warning("on_ts: no caller keys yet — scheduling 0.5s retry")
|
||
async def _brief_key_retry(pid=caller_id):
|
||
await asyncio.sleep(0.5)
|
||
if self._caller_all_keys:
|
||
try:
|
||
kp_r = self.lk_room.e2ee_manager.key_provider
|
||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||
ok = kp_r.set_key(pid, base_k, idx)
|
||
logger.info("on_ts_retry: set_key[%d] for %s (ok=%s)", idx, pid, ok)
|
||
except Exception as exc:
|
||
logger.warning("on_ts_retry: set_key failed: %s", exc)
|
||
else:
|
||
logger.warning("on_ts_retry: still no caller keys for %s", pid)
|
||
asyncio.ensure_future(_brief_key_retry())
|
||
except Exception as exc:
|
||
logger.warning("on_ts: set_key failed: %s", exc)
|
||
|
||
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
|
||
@self.lk_room.on("e2ee_state_changed")
|
||
def on_e2ee_state(participant, state):
|
||
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
|
||
p_id = participant.identity if participant else "local"
|
||
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
|
||
# When remote participant frame cryptor is NEW or MISSING_KEY → set their key
|
||
if participant and p_id != bot_identity and int(state) in (0, 3, 4):
|
||
if self._caller_all_keys:
|
||
try:
|
||
kp_e = self.lk_room.e2ee_manager.key_provider
|
||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||
ok = kp_e.set_key(p_id, base_k, idx)
|
||
logger.info("e2ee_state set_key[%d] for %s (ok=%s)", idx, p_id, ok)
|
||
except Exception as exc:
|
||
logger.warning("e2ee_state set_key failed: %s", exc)
|
||
|
||
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
|
||
logger.info("Connected (E2EE=HKDF), remote=%d",
|
||
len(self.lk_room.remote_participants))
|
||
|
||
# Set bot's own key immediately after connect — local frame cryptor exists at connect time.
|
||
# CALLER keys are set in on_track_subscribed (NOT here) because the caller's frame cryptor
|
||
# is only created when their track arrives. Calling set_key() before that skips HKDF.
|
||
kp = self.lk_room.e2ee_manager.key_provider
|
||
ok = kp.set_key(bot_identity, self._bot_key, 0)
|
||
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
|
||
|
||
# Element Call rotates its key when bot joins. Wait up to 3s for the
|
||
# rotated key to arrive via nio sync before proceeding. If it arrives,
|
||
# on_encryption_key() stores it and (if track already subscribed) sets it.
|
||
pre_key_count = len(self._caller_all_keys)
|
||
pre_max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
|
||
logger.info("Waiting up to 3s for key rotation (current: %d keys, max_idx=%d)...",
|
||
pre_key_count, pre_max_idx)
|
||
for _wait in range(6): # 6 × 0.5s = 3s
|
||
await asyncio.sleep(0.5)
|
||
new_count = len(self._caller_all_keys)
|
||
new_max = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
|
||
if new_max > pre_max_idx or new_count > pre_key_count:
|
||
self._caller_key = self._caller_all_keys[new_max]
|
||
logger.info("Key rotated: index %d→%d, count %d→%d",
|
||
pre_max_idx, new_max, pre_key_count, new_count)
|
||
break
|
||
else:
|
||
logger.info("No key rotation after 3s — proceeding with key[%d]", pre_max_idx)
|
||
|
||
# Find the remote participant, wait up to 10s if not yet connected
|
||
remote_identity = None
|
||
for p in self.lk_room.remote_participants.values():
|
||
remote_identity = p.identity
|
||
break
|
||
if not remote_identity:
|
||
logger.info("No remote participant yet, waiting...")
|
||
for _ in range(100):
|
||
await asyncio.sleep(0.1)
|
||
for p in self.lk_room.remote_participants.values():
|
||
remote_identity = p.identity
|
||
break
|
||
if remote_identity:
|
||
break
|
||
|
||
# Set shared_key with pre-derived AES key for caller decryption.
|
||
# NOT using set_key() for caller — Rust HKDF may produce different result than EC's JS HKDF.
|
||
# set_shared_key() stores key raw (no KDF applied) — we pre-derive in Python.
|
||
# After key rotation wait: if track already subscribed, set rotated key.
|
||
# (Usually on_track_subscribed handles this, but if track arrived before rotation,
|
||
# the rotated key needs to be set here for the already-subscribed participant.)
|
||
if self._caller_all_keys and remote_identity:
|
||
try:
|
||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||
ok = kp.set_key(remote_identity, base_k, idx)
|
||
logger.info("Post-rotation set_key[%d] for %s (ok=%s)",
|
||
idx, remote_identity, ok)
|
||
except Exception as e:
|
||
logger.warning("Post-rotation set_key failed: %s", e)
|
||
elif not self._caller_all_keys:
|
||
logger.warning("No caller E2EE keys received — incoming audio will be silence")
|
||
|
||
if remote_identity:
|
||
logger.info("Linking to remote participant: %s", remote_identity)
|
||
|
||
# Load memories and user preferences for this caller
|
||
memory_section = ""
|
||
user_timezone = None
|
||
if self._memory and self._caller_user_id:
|
||
try:
|
||
mems = await self._memory.query(self._caller_user_id, "voice call", top_k=10)
|
||
if mems:
|
||
conversations = []
|
||
for m in mems:
|
||
fact = m['fact']
|
||
# Extract timezone preference from stored memories
|
||
if fact.startswith("[PREF:timezone]"):
|
||
user_timezone = fact.split("]", 1)[1].strip()
|
||
else:
|
||
conversations.append(fact)
|
||
if conversations:
|
||
memory_section = "\n\nFrühere Gespräche mit diesem Nutzer:\n" + \
|
||
"\n---\n".join(conversations)
|
||
logger.info("Loaded %d memories for %s (tz=%s)",
|
||
len(mems), self._caller_user_id, user_timezone)
|
||
except Exception as exc:
|
||
logger.warning("Memory query failed: %s", exc)
|
||
# Also query specifically for timezone preference if not found above
|
||
if not user_timezone and self._memory and self._caller_user_id:
|
||
try:
|
||
tz_mems = await self._memory.query(
|
||
self._caller_user_id, "timezone preference", top_k=3)
|
||
for m in (tz_mems or []):
|
||
if m['fact'].startswith("[PREF:timezone]"):
|
||
user_timezone = m['fact'].split("]", 1)[1].strip()
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
# Voice pipeline — Jack Marlowe (native German male)
|
||
self._http_session = aiohttp.ClientSession()
|
||
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
|
||
self.session = AgentSession(
|
||
stt=elevenlabs.STT(
|
||
api_key=ELEVENLABS_KEY,
|
||
model_id="scribe_v2_realtime",
|
||
language_code=os.environ.get("STT_LANGUAGE", "de"),
|
||
http_session=self._http_session,
|
||
),
|
||
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
|
||
tts=elevenlabs.TTS(
|
||
voice_id=voice_id,
|
||
model="eleven_multilingual_v2",
|
||
language="de",
|
||
api_key=ELEVENLABS_KEY,
|
||
voice_settings=elevenlabs.VoiceSettings(
|
||
stability=0.5, similarity_boost=0.75, speed=1.15,
|
||
),
|
||
http_session=self._http_session,
|
||
),
|
||
vad=_get_vad(),
|
||
)
|
||
|
||
# Pipeline event logging (livekit-agents 1.4.2 event names)
|
||
_vad_state_log: dict = {"last": None, "speaking_count": 0, "away_since": None}
|
||
|
||
@self.session.on("user_state_changed")
|
||
def _on_user_state(ev):
|
||
state = ev.new_state
|
||
prev = _vad_state_log["last"]
|
||
_vad_state_log["last"] = state
|
||
if str(state) == "speaking":
|
||
_vad_state_log["speaking_count"] = _vad_state_log.get("speaking_count", 0) + 1
|
||
_vad_state_log["away_since"] = None
|
||
logger.info("VAD: user_state=%s (speaking_count=%d)", state, _vad_state_log["speaking_count"])
|
||
elif str(state) == "away" and prev and str(prev) != "away":
|
||
import time
|
||
_vad_state_log["away_since"] = time.monotonic()
|
||
logger.info("VAD: user_state=%s (was %s)", state, prev)
|
||
else:
|
||
logger.info("VAD: user_state=%s", state)
|
||
|
||
_last_user_speech: list[str] = []
|
||
|
||
@self.session.on("user_input_transcribed")
|
||
def _on_user_speech(ev):
|
||
text = ev.transcript or ""
|
||
if text and _is_stt_artifact(text):
|
||
logger.warning("NOISE_LEAK: artifact reached STT: %s", text)
|
||
else:
|
||
logger.info("USER_SPEECH: %s", text)
|
||
if ev.transcript:
|
||
_last_user_speech.append(ev.transcript)
|
||
self._transcript.append({"role": "user", "text": ev.transcript})
|
||
|
||
@self.session.on("conversation_item_added")
|
||
def _on_conversation_item(ev):
|
||
role = getattr(ev.item, "role", "?")
|
||
text = getattr(ev.item, "text_content", "") or ""
|
||
if role == "assistant" and text:
|
||
logger.info("AGENT_SPEECH: %s", text)
|
||
self._transcript.append({"role": "assistant", "text": text})
|
||
if self._memory and self._caller_user_id and _last_user_speech:
|
||
user_text = " ".join(_last_user_speech)
|
||
_last_user_speech.clear()
|
||
asyncio.ensure_future(
|
||
_store_voice_exchange(user_text, text,
|
||
self._caller_user_id, self.room_id))
|
||
|
||
# Brave Search tool — lets the agent answer questions about current events
|
||
@function_tool
|
||
async def search_web(query: str) -> str:
|
||
"""Search the web for current information using Brave Search.
|
||
|
||
Use this when asked about recent news, current events, prices,
|
||
weather, or any information that may have changed recently.
|
||
"""
|
||
logger.info("SEARCH: %s", query)
|
||
result = await _brave_search(query)
|
||
logger.info("SEARCH_RESULT: %s", result[:200])
|
||
return result
|
||
|
||
# Tool: set user timezone — called by the LLM when user mentions their location
|
||
caller_uid = self._caller_user_id
|
||
|
||
@function_tool
|
||
async def set_user_timezone(iana_timezone: str) -> str:
|
||
"""Store the user's timezone. Call this when the user mentions their city
|
||
or timezone (e.g. 'I'm in Nicosia' → 'Europe/Nicosia',
|
||
'I live in New York' → 'America/New_York').
|
||
Use IANA timezone names like Europe/Berlin, America/New_York, Asia/Tokyo.
|
||
"""
|
||
if caller_uid:
|
||
await _store_user_pref(caller_uid, "timezone", iana_timezone)
|
||
return f"Timezone set to {iana_timezone}"
|
||
|
||
# Extract active Confluence page ID from document context (if any)
|
||
_active_conf_id = None
|
||
if self._document_context:
|
||
_conf_ids = re.findall(r'confluence_page_id:(\d+)', self._document_context)
|
||
if _conf_ids:
|
||
_active_conf_id = _conf_ids[0]
|
||
|
||
@function_tool
|
||
async def read_confluence_page(page_id: str = "") -> str:
|
||
"""Read a Confluence page. Use when user asks to read, review,
|
||
or check a document. Returns page title and content as text.
|
||
Leave page_id empty to use the active document from the room."""
|
||
pid = page_id or _active_conf_id
|
||
if not pid:
|
||
return "No Confluence page ID available. Ask the user to share a Confluence link first."
|
||
logger.info("CONFLUENCE_READ: page_id=%s", pid)
|
||
try:
|
||
title, text, _ver = await _confluence_read_page(pid)
|
||
result = f"Page: {title}\n\n{text}"
|
||
logger.info("CONFLUENCE_READ_OK: %s (%d chars)", title, len(text))
|
||
return result
|
||
except Exception as exc:
|
||
logger.warning("CONFLUENCE_READ_FAIL: %s", exc)
|
||
return f"Failed to read page: {exc}"
|
||
|
||
@function_tool
|
||
async def update_confluence_page(section_heading: str, new_content: str, page_id: str = "") -> str:
|
||
"""Update a section of a Confluence page. Use when user asks to
|
||
change, update, or rewrite part of a document.
|
||
- section_heading: heading text of the section to update
|
||
- new_content: new plain text for the section (will be wrapped in <p> tags)
|
||
- page_id: leave empty to use the active document from the room
|
||
Human sees changes instantly in their browser via Live Docs."""
|
||
pid = page_id or _active_conf_id
|
||
if not pid:
|
||
return "No Confluence page ID available. Ask the user to share a Confluence link first."
|
||
logger.info("CONFLUENCE_UPDATE: page=%s section='%s'", pid, section_heading)
|
||
try:
|
||
new_html = f"<p>{new_content}</p>"
|
||
result = await _confluence_update_section(pid, section_heading, new_html)
|
||
logger.info("CONFLUENCE_UPDATE_OK: %s", result)
|
||
return result
|
||
except Exception as exc:
|
||
logger.warning("CONFLUENCE_UPDATE_FAIL: %s", exc)
|
||
return f"Failed to update page: {exc}"
|
||
|
||
# Deep thinking tool — escalates to Opus for complex questions
|
||
_transcript_ref = self._transcript
|
||
_doc_context_ref = self._document_context
|
||
|
||
@function_tool
|
||
async def think_deeper(question: str) -> str:
|
||
"""Denke intensiver ueber eine komplexe Frage nach mit einem staerkeren Modell.
|
||
Nutze dieses Tool wenn:
|
||
- Der Nutzer sagt "denk genauer nach", "think harder", "nimm opus", "use opus",
|
||
"ueberleg nochmal", "analysier das genauer", "bist du dir sicher", "are you sure",
|
||
"stimmt das wirklich", "check nochmal", "pruef das nochmal"
|
||
- Du dir bei einer komplexen Analyse, Code-Review oder Dokumentinterpretation unsicher bist
|
||
- Der Nutzer deine letzte Antwort anzweifelt oder eine Gegenprüfung wuenscht
|
||
- Eine Frage mehrere Schritte logisches Denken erfordert
|
||
|
||
Beschreibe die Frage so praezise wie moeglich — der Kontext (Transkript + Dokument)
|
||
wird automatisch mitgeliefert."""
|
||
# Build context: recent transcript + document
|
||
context_parts = []
|
||
if _doc_context_ref:
|
||
context_parts.append(f"Dokument-Kontext:\n{_doc_context_ref[:12000]}")
|
||
recent = _transcript_ref[-10:] if _transcript_ref else []
|
||
if recent:
|
||
lines = []
|
||
for e in recent:
|
||
role = "Nutzer" if e["role"] == "user" else "Assistent"
|
||
lines.append(f"{role}: {e['text']}")
|
||
context_parts.append(f"Gespraechsverlauf:\n" + "\n".join(lines))
|
||
context_parts.append(f"Frage: {question}")
|
||
full_prompt = "\n\n---\n\n".join(context_parts)
|
||
|
||
logger.info("THINK_DEEPER: %s (context=%d chars)", question[:100], len(full_prompt))
|
||
try:
|
||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||
resp = await client.post(
|
||
f"{LITELLM_URL}/chat/completions",
|
||
headers={"Authorization": f"Bearer {LITELLM_KEY}"},
|
||
json={
|
||
"model": "claude-opus",
|
||
"messages": [
|
||
{"role": "system", "content": (
|
||
"Du bist ein Experte fuer tiefgehende Analyse. "
|
||
"Beantworte die Frage praezise und ausfuehrlich basierend auf dem Kontext. "
|
||
"Antworte in der Sprache der Frage."
|
||
)},
|
||
{"role": "user", "content": full_prompt},
|
||
],
|
||
"max_tokens": 1500,
|
||
},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
answer = data["choices"][0]["message"]["content"]
|
||
logger.info("THINK_DEEPER_OK: %s", answer[:200])
|
||
return answer
|
||
except Exception as exc:
|
||
logger.warning("THINK_DEEPER_FAIL: %s", exc)
|
||
return f"Tiefere Analyse fehlgeschlagen: {exc}"
|
||
|
||
instructions = _build_voice_prompt(model=self.model, timezone=user_timezone) + memory_section
|
||
if self._document_context:
|
||
instructions += f"\n\nDokument-Kontext (im Raum hochgeladen):\n{self._document_context}"
|
||
if _active_conf_id:
|
||
instructions += f"\n\nAktive Confluence-Seite: {_active_conf_id}. Du brauchst den Nutzer NICHT nach der page_id zu fragen — nutze automatisch diese ID fuer read_confluence_page und update_confluence_page."
|
||
agent = _NoiseFilterAgent(
|
||
instructions=instructions,
|
||
tools=[search_web, set_user_timezone, read_confluence_page, update_confluence_page, think_deeper],
|
||
)
|
||
io_opts = room_io.RoomOptions(
|
||
participant_identity=remote_identity,
|
||
close_on_disconnect=False,
|
||
) if remote_identity else room_io.RoomOptions(close_on_disconnect=False)
|
||
await self.session.start(
|
||
agent=agent,
|
||
room=self.lk_room,
|
||
room_options=io_opts,
|
||
)
|
||
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
|
||
|
||
try:
|
||
await asyncio.wait_for(
|
||
self.session.generate_reply(
|
||
instructions="Sage nur: Hallo, wie kann ich helfen?"),
|
||
timeout=30.0)
|
||
logger.info("Greeting sent")
|
||
except asyncio.TimeoutError:
|
||
logger.error("Greeting timed out")
|
||
|
||
# VAD watchdog: log diagnostic and attempt E2EE key recovery if stuck
|
||
import time as _time
|
||
while True:
|
||
await asyncio.sleep(10)
|
||
away_since = _vad_state_log.get("away_since")
|
||
if away_since and (_time.monotonic() - away_since) > 30:
|
||
sc = _vad_state_log.get("speaking_count", 0)
|
||
e2ee_ok = any(
|
||
str(getattr(p, '_e2ee_state', '')) == 'OK'
|
||
for p in self.lk_room.remote_participants.values()
|
||
) if self.lk_room else False
|
||
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
|
||
logger.warning(
|
||
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
|
||
"remote_participants=%d, e2ee_ok=%s) — attempting E2EE key recovery",
|
||
sc, n_remote, e2ee_ok,
|
||
)
|
||
# Recovery: re-apply all stored E2EE keys for all remote participants
|
||
if self.lk_room and self._caller_all_keys:
|
||
try:
|
||
kp_w = self.lk_room.e2ee_manager.key_provider
|
||
for p in self.lk_room.remote_participants.values():
|
||
for idx, base_k in sorted(self._caller_all_keys.items()):
|
||
ok = kp_w.set_key(p.identity, base_k, idx)
|
||
logger.info("VAD_WATCHDOG: recovery set_key[%d] for %s (ok=%s)",
|
||
idx, p.identity, ok)
|
||
except Exception as exc:
|
||
logger.warning("VAD_WATCHDOG: recovery set_key failed: %s", exc)
|
||
_vad_state_log["away_since"] = None # only warn once per stuck period
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("Session cancelled for %s", self.room_id)
|
||
except Exception:
|
||
logger.exception("Session failed for %s", self.room_id)
|