]*>',
re.IGNORECASE,
)
next_match = next_heading.search(body_html, section_start)
section_end = next_match.start() if next_match else len(body_html)
# Replace section content
new_body = body_html[:section_start] + new_html + body_html[section_end:]
# PUT updated page
put_data = {
"version": {"number": version + 1},
"title": title,
"type": "page",
"body": {
"storage": {
"value": new_body,
"representation": "storage",
}
},
}
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.put(
url,
json=put_data,
auth=(CONFLUENCE_USER, CONFLUENCE_TOKEN),
)
resp.raise_for_status()
return f"Section '{section_heading}' updated successfully."
def _build_e2ee_options() -> rtc.E2EEOptions:
"""Build E2EE options — let Rust FFI apply HKDF internally (KDF_HKDF=1).
Pass raw base keys from Matrix key exchange events directly to set_key().
The Rust FFI derives the AES frame key via HKDF(base_key, ratchetSalt, ...) internally.
Element Call uses: ratchetWindowSize=10, keyringSize=256, ratchetSalt="LKFrameEncryptionKey"
NOTE: proto value 0 = PBKDF2 (not raw/none) — must use KDF_HKDF=1.
"""
key_opts = rtc.KeyProviderOptions(
shared_key=b"", # empty = per-participant mode
ratchet_window_size=10,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=10,
key_ring_size=256,
key_derivation_function=KDF_HKDF, # Rust FFI applies HKDF; we pass raw base keys
)
return rtc.E2EEOptions(
encryption_type=rtc.EncryptionType.GCM,
key_provider_options=key_opts,
)
class VoiceSession:
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
publish_key_cb=None, bot_key: bytes | None = None,
memory=None, caller_user_id: str | None = None,
document_context: str | None = None):
self.nio_client = nio_client
self.room_id = room_id
self.device_id = device_id
self.lk_url = lk_url
self.model = model
self.lk_room = None
self.session = None
self._task = None
self._http_session = None
self._caller_key: bytes | None = None
self._caller_identity: str | None = None
self._caller_all_keys: dict = {} # {index: bytes} — all caller keys by index
self._bot_key: bytes = bot_key or os.urandom(16)
self._publish_key_cb = publish_key_cb
self._memory = memory # MemoryClient instance from bot.py
self._caller_user_id = caller_user_id # Matrix user ID for memory lookup
self._document_context = document_context # PDF text from room for voice context
self._transcript: list[dict] = [] # {"role": "user"|"assistant", "text": "..."}
def on_encryption_key(self, sender, device_id, key, index):
"""Receive E2EE key from Element Call participant.
Store-only: keys are applied in on_track_subscribed() / on_e2ee_state_changed()
where the frame cryptor is guaranteed to exist. If the track is already
subscribed (late key arrival / rotation), set the key immediately.
"""
if not key:
return
if not self._caller_key:
self._caller_key = key
self._caller_identity = f"{sender}:{device_id}"
self._caller_all_keys[index] = key
logger.info("E2EE key received from %s:%s (index=%d, %d bytes, raw=%s)",
sender, device_id, index, len(key), key.hex())
# Late key arrival: if track already subscribed, frame cryptor exists — set key now.
if self.lk_room and self._caller_identity:
caller_lk_id = self._caller_identity
for p in self.lk_room.remote_participants.values():
if p.identity == caller_lk_id:
has_subscribed = any(
pub.subscribed for pub in p.track_publications.values()
)
if has_subscribed:
try:
kp = self.lk_room.e2ee_manager.key_provider
ok = kp.set_key(p.identity, key, index)
logger.info("Late key set_key[%d] for %s (ok=%s)",
index, p.identity, ok)
except Exception as e:
logger.warning("Late key set_key failed: %s", e)
break
async def _fetch_encryption_key_http(self) -> bytes | None:
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
Element Call distributes encryption keys as timeline events, not state.
"""
import httpx
homeserver = str(self.nio_client.homeserver)
token = self.nio_client.access_token
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
try:
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
url,
headers={"Authorization": f"Bearer {token}"},
params={"dir": "b", "limit": "50"},
)
resp.raise_for_status()
data = resp.json()
events = data.get("chunk", [])
user_id = self.nio_client.user_id
for evt in events:
evt_type = evt.get("type", "")
if evt_type == "io.element.call.encryption_keys":
sender = evt.get("sender", "")
if sender == user_id:
continue # skip our own key
content = evt.get("content", {})
device = content.get("device_id", "")
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
sender, device)
all_keys = {}
import base64 as b64
for k in content.get("keys", []):
key_b64 = k.get("key", "")
key_index = k.get("index", 0)
if key_b64:
key_b64 += "=" * (-len(key_b64) % 4)
key_bytes = b64.urlsafe_b64decode(key_b64)
all_keys[key_index] = key_bytes
if all_keys:
if device:
self._caller_identity = f"{sender}:{device}"
self._caller_all_keys.update(all_keys)
max_idx = max(all_keys.keys())
logger.info("Loaded caller keys at indices %s (using %d)",
sorted(all_keys.keys()), max_idx)
return all_keys[max_idx]
logger.info("No encryption_keys events in last %d timeline events", len(events))
except Exception as e:
logger.warning("HTTP encryption key fetch failed: %s", e)
return None
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
for obj in [self.session, self.lk_room, self._http_session]:
if obj:
try:
if hasattr(obj, "aclose"):
await obj.aclose()
elif hasattr(obj, "disconnect"):
await obj.disconnect()
elif hasattr(obj, "close"):
await obj.close()
except Exception:
pass
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def get_transcript(self) -> list[dict]:
"""Return the call transcript as a list of {role, text} dicts."""
return list(self._transcript)
def get_document_context(self) -> str | None:
"""Return the document context loaded for this call, if any."""
return self._document_context
def get_confluence_page_id(self) -> str | None:
"""Return the active Confluence page ID, if any."""
if not self._document_context:
return None
ids = re.findall(r'confluence_page_id:(\d+)', self._document_context)
return ids[0] if ids else None
async def _run(self):
try:
user_id = self.nio_client.user_id
bot_identity = _make_lk_identity(user_id, self.device_id)
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
# Publish bot's own key immediately so Element Call can decrypt us
if self._publish_key_cb:
self._publish_key_cb(self._bot_key)
logger.info("Published bot E2EE key (%d bytes)", len(self._bot_key))
# Check timeline for caller's encryption key
caller_key = await self._fetch_encryption_key_http()
if caller_key:
self._caller_key = caller_key
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
if not self._caller_key:
# Wait up to 15s for key via sync handler
logger.info("No key in timeline yet, waiting for sync...")
for _ in range(150):
if self._caller_key:
break
await asyncio.sleep(0.1)
# E2EE: configurable via E2EE_ENABLED env var (default true).
# When enabled, parameters MUST match Element Call JS SDK.
e2ee_enabled = os.environ.get("E2EE_ENABLED", "true").lower() in ("true", "1", "yes")
if e2ee_enabled:
e2ee_opts = _build_e2ee_options()
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
logger.info("E2EE enabled (HKDF mode)")
else:
e2ee_opts = None
room_opts = rtc.RoomOptions()
logger.warning("E2EE DISABLED — audio is unencrypted")
self.lk_room = rtc.Room()
@self.lk_room.on("participant_connected")
def on_p(p):
logger.info("Participant connected: %s", p.identity)
# Note: do NOT set keys here — frame cryptor not initialized yet at participant_connected.
# Keys are set in on_track_subscribed where the frame cryptor definitely exists.
@self.lk_room.on("track_published")
def on_tp(pub, p):
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_muted")
def on_mute(p, pub):
logger.info("Track MUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_unmuted")
def on_unmute(p, pub):
logger.info("Track UNMUTED: %s %s kind=%s", p.identity, pub.sid, pub.kind)
@self.lk_room.on("track_subscribed")
def on_ts(t, pub, p):
logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted)
# NOTE: Do NOT create rtc.AudioStream here — it competes with AgentSession's
# internal audio pipeline for event loop time, causing intermittent VAD failures
# (user_state stuck on "away"). See MAT-40. Use e2ee_state_changed for flow confirmation.
# *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor
# for this participant already exists. The frame cryptor is created at track
# subscription time. Calling set_key() BEFORE track subscription (at connect)
# skips HKDF derivation → raw key stored → DEC_FAILED.
# Solution: set caller key HERE, after frame cryptor is initialized.
if int(t.kind) == 1 and e2ee_opts is not None: # audio track only
caller_id = p.identity
logger.info("E2EE_DIAG: track_subscribed for %s, have %d caller keys",
caller_id, len(self._caller_all_keys))
try:
kp_local = self.lk_room.e2ee_manager.key_provider
if self._caller_all_keys:
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_local.set_key(caller_id, base_k, idx)
logger.info("on_ts: set_key[%d] for %s (ok=%s, %d bytes, raw=%s)",
idx, caller_id, ok, len(base_k), base_k.hex())
else:
logger.warning("on_ts: no caller keys yet — scheduling 0.5s retry")
async def _brief_key_retry(pid=caller_id):
await asyncio.sleep(0.5)
if self._caller_all_keys:
try:
kp_r = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_r.set_key(pid, base_k, idx)
logger.info("on_ts_retry: set_key[%d] for %s (ok=%s)", idx, pid, ok)
except Exception as exc:
logger.warning("on_ts_retry: set_key failed: %s", exc)
else:
logger.warning("on_ts_retry: still no caller keys for %s", pid)
asyncio.ensure_future(_brief_key_retry())
except Exception as exc:
logger.warning("on_ts: set_key failed: %s", exc)
_e2ee_state_names = {0:"NEW",1:"OK",2:"ENC_FAILED",3:"DEC_FAILED",4:"MISSING_KEY",5:"RATCHETED",6:"INTERNAL_ERR"}
@self.lk_room.on("e2ee_state_changed")
def on_e2ee_state(participant, state):
state_name = _e2ee_state_names.get(int(state), f"UNKNOWN_{state}")
p_id = participant.identity if participant else "local"
logger.info("E2EE_STATE: participant=%s state=%s", p_id, state_name)
# When remote participant frame cryptor is NEW or MISSING_KEY → set their key
if participant and p_id != bot_identity and int(state) in (0, 3, 4):
if self._caller_all_keys:
try:
kp_e = self.lk_room.e2ee_manager.key_provider
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_e.set_key(p_id, base_k, idx)
logger.info("e2ee_state set_key[%d] for %s (ok=%s)", idx, p_id, ok)
except Exception as exc:
logger.warning("e2ee_state set_key failed: %s", exc)
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
logger.info("Connected (E2EE=HKDF), remote=%d",
len(self.lk_room.remote_participants))
# Set bot's own key immediately after connect — local frame cryptor exists at connect time.
# CALLER keys are set in on_track_subscribed (NOT here) because the caller's frame cryptor
# is only created when their track arrives. Calling set_key() before that skips HKDF.
kp = self.lk_room.e2ee_manager.key_provider
ok = kp.set_key(bot_identity, self._bot_key, 0)
logger.info("Set bot key for %s (ok=%s, %d bytes)", bot_identity, ok, len(self._bot_key))
# Element Call rotates its key when bot joins. Wait up to 3s for the
# rotated key to arrive via nio sync before proceeding. If it arrives,
# on_encryption_key() stores it and (if track already subscribed) sets it.
pre_key_count = len(self._caller_all_keys)
pre_max_idx = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
logger.info("Waiting up to 3s for key rotation (current: %d keys, max_idx=%d)...",
pre_key_count, pre_max_idx)
for _wait in range(6): # 6 × 0.5s = 3s
await asyncio.sleep(0.5)
new_count = len(self._caller_all_keys)
new_max = max(self._caller_all_keys.keys()) if self._caller_all_keys else -1
if new_max > pre_max_idx or new_count > pre_key_count:
self._caller_key = self._caller_all_keys[new_max]
logger.info("Key rotated: index %d→%d, count %d→%d",
pre_max_idx, new_max, pre_key_count, new_count)
break
else:
logger.info("No key rotation after 3s — proceeding with key[%d]", pre_max_idx)
# Find the remote participant, wait up to 10s if not yet connected
remote_identity = None
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if not remote_identity:
logger.info("No remote participant yet, waiting...")
for _ in range(100):
await asyncio.sleep(0.1)
for p in self.lk_room.remote_participants.values():
remote_identity = p.identity
break
if remote_identity:
break
# Set shared_key with pre-derived AES key for caller decryption.
# NOT using set_key() for caller — Rust HKDF may produce different result than EC's JS HKDF.
# set_shared_key() stores key raw (no KDF applied) — we pre-derive in Python.
# After key rotation wait: if track already subscribed, set rotated key.
# (Usually on_track_subscribed handles this, but if track arrived before rotation,
# the rotated key needs to be set here for the already-subscribed participant.)
if self._caller_all_keys and remote_identity:
try:
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp.set_key(remote_identity, base_k, idx)
logger.info("Post-rotation set_key[%d] for %s (ok=%s)",
idx, remote_identity, ok)
except Exception as e:
logger.warning("Post-rotation set_key failed: %s", e)
elif not self._caller_all_keys:
logger.warning("No caller E2EE keys received — incoming audio will be silence")
if remote_identity:
logger.info("Linking to remote participant: %s", remote_identity)
# Load memories and user preferences for this caller
memory_section = ""
user_timezone = None
if self._memory and self._caller_user_id:
try:
mems = await self._memory.query(self._caller_user_id, "voice call", top_k=10)
if mems:
conversations = []
for m in mems:
fact = m['fact']
# Extract timezone preference from stored memories
if fact.startswith("[PREF:timezone]"):
user_timezone = fact.split("]", 1)[1].strip()
else:
conversations.append(fact)
if conversations:
memory_section = "\n\nFrühere Gespräche mit diesem Nutzer:\n" + \
"\n---\n".join(conversations)
logger.info("Loaded %d memories for %s (tz=%s)",
len(mems), self._caller_user_id, user_timezone)
except Exception as exc:
logger.warning("Memory query failed: %s", exc)
# Also query specifically for timezone preference if not found above
if not user_timezone and self._memory and self._caller_user_id:
try:
tz_mems = await self._memory.query(
self._caller_user_id, "timezone preference", top_k=3)
for m in (tz_mems or []):
if m['fact'].startswith("[PREF:timezone]"):
user_timezone = m['fact'].split("]", 1)[1].strip()
break
except Exception:
pass
# Voice pipeline — Jack Marlowe (native German male)
self._http_session = aiohttp.ClientSession()
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
self.session = AgentSession(
stt=elevenlabs.STT(
api_key=ELEVENLABS_KEY,
model_id="scribe_v2_realtime",
language_code=os.environ.get("STT_LANGUAGE", "de"),
http_session=self._http_session,
),
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
tts=elevenlabs.TTS(
voice_id=voice_id,
model="eleven_multilingual_v2",
language="de",
api_key=ELEVENLABS_KEY,
voice_settings=elevenlabs.VoiceSettings(
stability=0.5, similarity_boost=0.75, speed=1.15,
),
http_session=self._http_session,
),
vad=_get_vad(),
)
# Pipeline event logging (livekit-agents 1.4.2 event names)
_vad_state_log: dict = {"last": None, "speaking_count": 0, "away_since": None}
@self.session.on("user_state_changed")
def _on_user_state(ev):
state = ev.new_state
prev = _vad_state_log["last"]
_vad_state_log["last"] = state
if str(state) == "speaking":
_vad_state_log["speaking_count"] = _vad_state_log.get("speaking_count", 0) + 1
_vad_state_log["away_since"] = None
logger.info("VAD: user_state=%s (speaking_count=%d)", state, _vad_state_log["speaking_count"])
elif str(state) == "away" and prev and str(prev) != "away":
import time
_vad_state_log["away_since"] = time.monotonic()
logger.info("VAD: user_state=%s (was %s)", state, prev)
else:
logger.info("VAD: user_state=%s", state)
_last_user_speech: list[str] = []
@self.session.on("user_input_transcribed")
def _on_user_speech(ev):
text = ev.transcript or ""
if text and _is_stt_artifact(text):
logger.warning("NOISE_LEAK: artifact reached STT: %s", text)
else:
logger.info("USER_SPEECH: %s", text)
if ev.transcript:
_last_user_speech.append(ev.transcript)
self._transcript.append({"role": "user", "text": ev.transcript})
@self.session.on("conversation_item_added")
def _on_conversation_item(ev):
role = getattr(ev.item, "role", "?")
text = getattr(ev.item, "text_content", "") or ""
if role == "assistant" and text:
logger.info("AGENT_SPEECH: %s", text)
self._transcript.append({"role": "assistant", "text": text})
if self._memory and self._caller_user_id and _last_user_speech:
user_text = " ".join(_last_user_speech)
_last_user_speech.clear()
asyncio.ensure_future(
_store_voice_exchange(user_text, text,
self._caller_user_id, self.room_id))
# Brave Search tool — lets the agent answer questions about current events
@function_tool
async def search_web(query: str) -> str:
"""Search the web for current information using Brave Search.
Use this when asked about recent news, current events, prices,
weather, or any information that may have changed recently.
"""
logger.info("SEARCH: %s", query)
result = await _brave_search(query)
logger.info("SEARCH_RESULT: %s", result[:200])
return result
# Tool: set user timezone — called by the LLM when user mentions their location
caller_uid = self._caller_user_id
@function_tool
async def set_user_timezone(iana_timezone: str) -> str:
"""Store the user's timezone. Call this when the user mentions their city
or timezone (e.g. 'I'm in Nicosia' → 'Europe/Nicosia',
'I live in New York' → 'America/New_York').
Use IANA timezone names like Europe/Berlin, America/New_York, Asia/Tokyo.
"""
if caller_uid:
await _store_user_pref(caller_uid, "timezone", iana_timezone)
return f"Timezone set to {iana_timezone}"
# Extract active Confluence page ID from document context (if any)
_active_conf_id = None
if self._document_context:
_conf_ids = re.findall(r'confluence_page_id:(\d+)', self._document_context)
if _conf_ids:
_active_conf_id = _conf_ids[0]
@function_tool
async def read_confluence_page(page_id: str = "") -> str:
"""Read a Confluence page. Use when user asks to read, review,
or check a document. Returns page title and content as text.
Leave page_id empty to use the active document from the room."""
pid = page_id or _active_conf_id
if not pid:
return "No Confluence page ID available. Ask the user to share a Confluence link first."
logger.info("CONFLUENCE_READ: page_id=%s", pid)
try:
title, text, _ver = await _confluence_read_page(pid)
result = f"Page: {title}\n\n{text}"
logger.info("CONFLUENCE_READ_OK: %s (%d chars)", title, len(text))
return result
except Exception as exc:
logger.warning("CONFLUENCE_READ_FAIL: %s", exc)
return f"Failed to read page: {exc}"
@function_tool
async def update_confluence_page(section_heading: str, new_content: str, page_id: str = "") -> str:
"""Update a section of a Confluence page. Use when user asks to
change, update, or rewrite part of a document.
- section_heading: heading text of the section to update
- new_content: new plain text for the section (will be wrapped in tags)
- page_id: leave empty to use the active document from the room
Human sees changes instantly in their browser via Live Docs."""
pid = page_id or _active_conf_id
if not pid:
return "No Confluence page ID available. Ask the user to share a Confluence link first."
logger.info("CONFLUENCE_UPDATE: page=%s section='%s'", pid, section_heading)
try:
new_html = f"
{new_content}
"
result = await _confluence_update_section(pid, section_heading, new_html)
logger.info("CONFLUENCE_UPDATE_OK: %s", result)
return result
except Exception as exc:
logger.warning("CONFLUENCE_UPDATE_FAIL: %s", exc)
return f"Failed to update page: {exc}"
instructions = _build_voice_prompt(model=self.model, timezone=user_timezone) + memory_section
if self._document_context:
instructions += f"\n\nDokument-Kontext (im Raum hochgeladen):\n{self._document_context}"
if _active_conf_id:
instructions += f"\n\nAktive Confluence-Seite: {_active_conf_id}. Du brauchst den Nutzer NICHT nach der page_id zu fragen — nutze automatisch diese ID fuer read_confluence_page und update_confluence_page."
agent = _NoiseFilterAgent(
instructions=instructions,
tools=[search_web, set_user_timezone, read_confluence_page, update_confluence_page],
)
io_opts = room_io.RoomOptions(
participant_identity=remote_identity,
close_on_disconnect=False,
) if remote_identity else room_io.RoomOptions(close_on_disconnect=False)
await self.session.start(
agent=agent,
room=self.lk_room,
room_options=io_opts,
)
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
try:
await asyncio.wait_for(
self.session.generate_reply(
instructions="Sage nur: Hallo, wie kann ich helfen?"),
timeout=30.0)
logger.info("Greeting sent")
except asyncio.TimeoutError:
logger.error("Greeting timed out")
# VAD watchdog: log diagnostic and attempt E2EE key recovery if stuck
import time as _time
while True:
await asyncio.sleep(10)
away_since = _vad_state_log.get("away_since")
if away_since and (_time.monotonic() - away_since) > 30:
sc = _vad_state_log.get("speaking_count", 0)
e2ee_ok = any(
str(getattr(p, '_e2ee_state', '')) == 'OK'
for p in self.lk_room.remote_participants.values()
) if self.lk_room else False
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
logger.warning(
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
"remote_participants=%d, e2ee_ok=%s) — attempting E2EE key recovery",
sc, n_remote, e2ee_ok,
)
# Recovery: re-apply all stored E2EE keys for all remote participants
if self.lk_room and self._caller_all_keys:
try:
kp_w = self.lk_room.e2ee_manager.key_provider
for p in self.lk_room.remote_participants.values():
for idx, base_k in sorted(self._caller_all_keys.items()):
ok = kp_w.set_key(p.identity, base_k, idx)
logger.info("VAD_WATCHDOG: recovery set_key[%d] for %s (ok=%s)",
idx, p.identity, ok)
except Exception as exc:
logger.warning("VAD_WATCHDOG: recovery set_key failed: %s", exc)
_vad_state_log["away_since"] = None # only warn once per stuck period
except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id)
except Exception:
logger.exception("Session failed for %s", self.room_id)