- Bot tells which model it uses when asked - Injects current UTC datetime into prompt - Responds in users language instead of always German Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
347 lines
15 KiB
Python
347 lines
15 KiB
Python
"""Voice session: LiveKit + STT/LLM/TTS pipeline.
|
|
E2EE via HKDF key derivation (Element Call compatible).
|
|
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
|
|
import asyncio
|
|
import base64
|
|
import datetime
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
|
|
import aiohttp
|
|
from livekit import rtc, api as lkapi
|
|
from livekit.agents import Agent, AgentSession, room_io
|
|
from livekit.plugins import openai as lk_openai, elevenlabs, silero
|
|
|
|
logger = logging.getLogger("matrix-ai-voice")
|
|
|
|
# Enable debug logging for agents pipeline to diagnose audio issues
|
|
logging.getLogger("livekit.agents").setLevel(logging.DEBUG)
|
|
logging.getLogger("livekit.plugins").setLevel(logging.DEBUG)
|
|
|
|
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
|
|
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
|
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
|
|
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
|
|
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
|
|
DEFAULT_VOICE_ID = "JBFqnCBsd6RMkjVDRZzb" # George - warm, captivating, British male
|
|
|
|
VOICE_PROMPT_TEMPLATE = """Du bist ein hilfreicher Sprachassistent von Agiliton in einem Matrix-Anruf.
|
|
Du heisst "Agiliton Assistant". Du basierst auf dem Modell {model}.
|
|
Wenn jemand fragt welches Modell du bist, sei transparent und sage es.
|
|
|
|
Aktuelle Zeit: {datetime}
|
|
|
|
STRIKTE Regeln:
|
|
- Antworte in der Sprache in der der Nutzer spricht
|
|
- Halte JEDE Antwort auf MAXIMAL 1-2 kurze Saetze
|
|
- Sei direkt und praezise, keine Fuellwoerter
|
|
- Erfinde NICHTS - keine Geschichten, keine Musik, keine Fantasie
|
|
- Beantworte nur was gefragt wird
|
|
- Wenn niemand etwas fragt, sage nur kurz Hallo"""
|
|
|
|
_vad = None
|
|
def _get_vad():
|
|
global _vad
|
|
if _vad is None:
|
|
_vad = silero.VAD.load()
|
|
return _vad
|
|
|
|
def _make_lk_identity(user_id, device_id):
|
|
return f"{user_id}:{device_id}"
|
|
|
|
def _compute_lk_room_name(room_id):
|
|
raw = f"{room_id}|m.call#ROOM"
|
|
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
|
|
|
|
def _generate_lk_jwt(room_id, user_id, device_id):
|
|
identity = _make_lk_identity(user_id, device_id)
|
|
lk_room = _compute_lk_room_name(room_id)
|
|
token = (
|
|
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
|
|
.with_identity(identity)
|
|
.with_name(user_id)
|
|
.with_grants(lkapi.VideoGrants(
|
|
room_join=True, room=lk_room,
|
|
can_publish=True, can_subscribe=True))
|
|
.with_ttl(datetime.timedelta(hours=24))
|
|
)
|
|
logger.info("JWT: identity=%s room=%s", identity, lk_room)
|
|
return token.to_jwt()
|
|
|
|
|
|
KDF_HKDF = 1
|
|
|
|
|
|
def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions:
|
|
"""Build HKDF E2EE options matching Element Call's key derivation.
|
|
|
|
Element Call uses: ratchetWindowSize=10, keyringSize=256, salt="LKFrameEncryptionKey"
|
|
"""
|
|
key_opts = rtc.KeyProviderOptions(
|
|
shared_key=shared_key,
|
|
ratchet_window_size=10,
|
|
ratchet_salt=b"LKFrameEncryptionKey",
|
|
failure_tolerance=10,
|
|
key_ring_size=256,
|
|
key_derivation_function=KDF_HKDF,
|
|
)
|
|
return rtc.E2EEOptions(key_provider_options=key_opts)
|
|
|
|
|
|
class VoiceSession:
|
|
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet",
|
|
publish_key_cb=None, bot_key: bytes | None = None):
|
|
self.nio_client = nio_client
|
|
self.room_id = room_id
|
|
self.device_id = device_id
|
|
self.lk_url = lk_url
|
|
self.model = model
|
|
self.lk_room = None
|
|
self.session = None
|
|
self._task = None
|
|
self._http_session = None
|
|
self._caller_key: bytes | None = None
|
|
self._caller_identity: str | None = None # "sender:device_id" format
|
|
self._bot_key: bytes = bot_key or os.urandom(16)
|
|
self._publish_key_cb = publish_key_cb
|
|
|
|
def on_encryption_key(self, sender, device_id, key, index):
|
|
"""Receive E2EE key from Element Call participant.
|
|
|
|
If the room is already connected, immediately set the key on the
|
|
key provider so we can decrypt the caller's audio.
|
|
"""
|
|
if not key:
|
|
return
|
|
identity = _make_lk_identity(sender, device_id)
|
|
self._caller_key = key
|
|
self._caller_identity = identity
|
|
logger.info("E2EE key received from %s:%s (identity=%s, index=%d, %d bytes)",
|
|
sender, device_id, identity, index, len(key))
|
|
|
|
# If already connected, set key on the key provider immediately
|
|
if self.lk_room:
|
|
try:
|
|
kp = self.lk_room.e2ee_manager.key_provider
|
|
kp.set_key(identity, key, key_index=index)
|
|
logger.info("Live-updated caller E2EE key for %s", identity)
|
|
except Exception:
|
|
logger.warning("Could not live-update caller E2EE key", exc_info=True)
|
|
|
|
async def _fetch_encryption_key_http(self) -> bytes | None:
|
|
"""Fetch encryption key from room timeline (NOT state) via Matrix HTTP API.
|
|
|
|
Element Call distributes encryption keys as timeline events, not state.
|
|
Also sets self._caller_identity from the event sender + device_id.
|
|
"""
|
|
import httpx
|
|
homeserver = str(self.nio_client.homeserver)
|
|
token = self.nio_client.access_token
|
|
url = f"{homeserver}/_matrix/client/v3/rooms/{self.room_id}/messages"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as http:
|
|
resp = await http.get(
|
|
url,
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
params={"dir": "b", "limit": "50"},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
events = data.get("chunk", [])
|
|
user_id = self.nio_client.user_id
|
|
for evt in events:
|
|
evt_type = evt.get("type", "")
|
|
if evt_type == "io.element.call.encryption_keys":
|
|
sender = evt.get("sender", "")
|
|
if sender == user_id:
|
|
continue # skip our own key
|
|
content = evt.get("content", {})
|
|
caller_device = content.get("device_id", "")
|
|
logger.info("Found encryption_keys timeline event: sender=%s device=%s",
|
|
sender, caller_device)
|
|
if caller_device:
|
|
self._caller_identity = _make_lk_identity(sender, caller_device)
|
|
for k in content.get("keys", []):
|
|
key_b64 = k.get("key", "")
|
|
if key_b64:
|
|
key_b64 += "=" * (-len(key_b64) % 4)
|
|
return base64.urlsafe_b64decode(key_b64)
|
|
logger.info("No encryption_keys events in last %d timeline events", len(events))
|
|
except Exception as e:
|
|
logger.warning("HTTP encryption key fetch failed: %s", e)
|
|
return None
|
|
|
|
async def start(self):
|
|
self._task = asyncio.create_task(self._run())
|
|
|
|
async def stop(self):
|
|
for obj in [self.session, self.lk_room, self._http_session]:
|
|
if obj:
|
|
try:
|
|
if hasattr(obj, "aclose"):
|
|
await obj.aclose()
|
|
elif hasattr(obj, "disconnect"):
|
|
await obj.disconnect()
|
|
elif hasattr(obj, "close"):
|
|
await obj.close()
|
|
except Exception:
|
|
pass
|
|
if self._task and not self._task.done():
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _run(self):
|
|
try:
|
|
user_id = self.nio_client.user_id
|
|
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
|
|
|
|
# Check timeline for caller's encryption key
|
|
caller_key = await self._fetch_encryption_key_http()
|
|
if caller_key:
|
|
self._caller_key = caller_key
|
|
logger.info("Got caller E2EE key via timeline (%d bytes)", len(caller_key))
|
|
|
|
if not self._caller_key:
|
|
# Wait up to 15s for key via sync handler (bot.py forwards
|
|
# encryption_keys timeline events to on_encryption_key)
|
|
logger.info("No key in timeline yet, waiting for sync...")
|
|
for _ in range(150):
|
|
if self._caller_key:
|
|
break
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Publish bot's own key so caller can decrypt our audio
|
|
if self._publish_key_cb:
|
|
self._publish_key_cb(self._bot_key)
|
|
|
|
# Build E2EE options with empty shared key — we set per-participant
|
|
# keys after connect via e2ee_manager.key_provider.set_key()
|
|
e2ee_opts = _build_e2ee_options(b"")
|
|
|
|
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
|
|
self.lk_room = rtc.Room()
|
|
|
|
@self.lk_room.on("participant_connected")
|
|
def on_p(p):
|
|
logger.info("Participant connected: %s", p.identity)
|
|
|
|
@self.lk_room.on("track_published")
|
|
def on_tp(pub, p):
|
|
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
|
|
|
@self.lk_room.on("track_subscribed")
|
|
def on_ts(t, pub, p):
|
|
logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind)
|
|
|
|
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
|
|
logger.info("Connected (E2EE=per-participant), remote=%d",
|
|
len(self.lk_room.remote_participants))
|
|
|
|
# Set per-participant E2EE keys via key provider
|
|
bot_identity = _make_lk_identity(user_id, self.device_id)
|
|
try:
|
|
kp = self.lk_room.e2ee_manager.key_provider
|
|
|
|
# Set bot's own key (encrypts outgoing audio)
|
|
kp.set_key(bot_identity, self._bot_key, key_index=0)
|
|
logger.info("Set bot E2EE key for identity=%s (%d bytes)",
|
|
bot_identity, len(self._bot_key))
|
|
|
|
# Set caller's key (decrypts incoming audio)
|
|
if self._caller_key and self._caller_identity:
|
|
kp.set_key(self._caller_identity, self._caller_key, key_index=0)
|
|
logger.info("Set caller E2EE key for identity=%s (%d bytes)",
|
|
self._caller_identity, len(self._caller_key))
|
|
elif self._caller_key:
|
|
# Caller identity not yet known — try to get from remote participants
|
|
for p in self.lk_room.remote_participants.values():
|
|
kp.set_key(p.identity, self._caller_key, key_index=0)
|
|
logger.info("Set caller E2EE key for identity=%s (%d bytes)",
|
|
p.identity, len(self._caller_key))
|
|
break
|
|
else:
|
|
logger.warning("No caller E2EE key available — caller audio will be silent")
|
|
except AttributeError:
|
|
logger.warning("e2ee_manager.key_provider not available — "
|
|
"falling back to shared key mode")
|
|
# Fallback: set shared key after connect if per-participant isn't supported
|
|
if self._caller_key:
|
|
try:
|
|
kp = self.lk_room.e2ee_manager.key_provider
|
|
kp.set_shared_key(self._caller_key, key_index=0)
|
|
logger.info("Fallback: set shared E2EE key (%d bytes)",
|
|
len(self._caller_key))
|
|
except Exception:
|
|
logger.exception("Fallback shared key also failed")
|
|
|
|
# Find the remote participant, wait up to 10s if not yet connected
|
|
remote_identity = None
|
|
for p in self.lk_room.remote_participants.values():
|
|
remote_identity = p.identity
|
|
break
|
|
if not remote_identity:
|
|
logger.info("No remote participant yet, waiting...")
|
|
for _ in range(100):
|
|
await asyncio.sleep(0.1)
|
|
for p in self.lk_room.remote_participants.values():
|
|
remote_identity = p.identity
|
|
break
|
|
if remote_identity:
|
|
break
|
|
if remote_identity:
|
|
logger.info("Linking to remote participant: %s", remote_identity)
|
|
|
|
# Voice pipeline — German male voice (Daniel)
|
|
self._http_session = aiohttp.ClientSession()
|
|
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
|
|
self.session = AgentSession(
|
|
stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session),
|
|
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
|
|
tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2",
|
|
api_key=ELEVENLABS_KEY, http_session=self._http_session),
|
|
vad=_get_vad(),
|
|
)
|
|
|
|
# Debug: log speech events
|
|
@self.session.on("user_speech_committed")
|
|
def _on_user_speech(msg):
|
|
logger.info("USER_SPEECH: %s", msg.text_content)
|
|
|
|
@self.session.on("agent_speech_committed")
|
|
def _on_agent_speech(msg):
|
|
logger.info("AGENT_SPEECH: %s", msg.text_content)
|
|
|
|
now = datetime.datetime.now(datetime.timezone.utc).strftime("%A, %B %d, %Y %H:%M UTC")
|
|
prompt = VOICE_PROMPT_TEMPLATE.format(model=self.model, datetime=now)
|
|
agent = Agent(instructions=prompt)
|
|
io_opts = room_io.RoomOptions(
|
|
participant_identity=remote_identity,
|
|
) if remote_identity else room_io.RoomOptions()
|
|
await self.session.start(
|
|
agent=agent,
|
|
room=self.lk_room,
|
|
room_options=io_opts,
|
|
)
|
|
logger.info("Voice pipeline started (voice=%s, linked_to=%s)", voice_id, remote_identity)
|
|
|
|
try:
|
|
await asyncio.wait_for(
|
|
self.session.generate_reply(
|
|
instructions="Sage nur: Hallo, wie kann ich helfen?"),
|
|
timeout=30.0)
|
|
logger.info("Greeting sent")
|
|
except asyncio.TimeoutError:
|
|
logger.error("Greeting timed out")
|
|
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Session cancelled for %s", self.room_id)
|
|
except Exception:
|
|
logger.exception("Session failed for %s", self.room_id)
|