fix(e2ee): Add E2EE HKDF to voice.py, bot uses patched Dockerfile
voice.py runs in bot container, not agent container. - Wait 3s for encryption key before connecting - Build E2EE options with HKDF when key received - Bot container now uses patched Dockerfile (needs FFI) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -11,7 +11,7 @@ services:
|
||||
bot:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile.bot
|
||||
dockerfile: Dockerfile
|
||||
command: python bot.py
|
||||
env_file: .env
|
||||
restart: unless-stopped
|
||||
|
||||
187
voice.py
Normal file
187
voice.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""Voice session: LiveKit + STT/LLM/TTS pipeline.
|
||||
E2EE via HKDF key derivation (Element Call compatible).
|
||||
Requires patched livekit-rtc FFI binary from onestacked/livekit-rust-sdks."""
|
||||
import asyncio
|
||||
import base64
|
||||
import datetime
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
from livekit import rtc, api as lkapi
|
||||
from livekit.agents import Agent, AgentSession
|
||||
from livekit.plugins import openai as lk_openai, elevenlabs, silero
|
||||
|
||||
logger = logging.getLogger("matrix-ai-voice")
|
||||
|
||||
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
|
||||
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
||||
LK_API_KEY = os.environ.get("LIVEKIT_API_KEY", "")
|
||||
LK_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "")
|
||||
ELEVENLABS_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
|
||||
DEFAULT_VOICE_ID = "onwK4e9ZLuTAKqWW03F9" # Daniel - male, free tier
|
||||
|
||||
VOICE_PROMPT = """Du bist ein hilfreicher Sprachassistent in einem Matrix-Anruf.
|
||||
Regeln:
|
||||
- Halte Antworten KURZ - 1-3 Saetze maximal
|
||||
- Sei direkt, keine Fuellwoerter
|
||||
- Antworte immer auf Deutsch"""
|
||||
|
||||
_vad = None
|
||||
def _get_vad():
|
||||
global _vad
|
||||
if _vad is None:
|
||||
_vad = silero.VAD.load()
|
||||
return _vad
|
||||
|
||||
def _make_lk_identity(user_id, device_id):
|
||||
return f"{user_id}:{device_id}"
|
||||
|
||||
def _compute_lk_room_name(room_id):
|
||||
raw = f"{room_id}|m.call#ROOM"
|
||||
return base64.b64encode(hashlib.sha256(raw.encode()).digest()).decode().rstrip("=")
|
||||
|
||||
def _generate_lk_jwt(room_id, user_id, device_id):
|
||||
identity = _make_lk_identity(user_id, device_id)
|
||||
lk_room = _compute_lk_room_name(room_id)
|
||||
token = (
|
||||
lkapi.AccessToken(LK_API_KEY, LK_API_SECRET)
|
||||
.with_identity(identity)
|
||||
.with_name(user_id)
|
||||
.with_grants(lkapi.VideoGrants(
|
||||
room_join=True, room=lk_room,
|
||||
can_publish=True, can_subscribe=True))
|
||||
.with_ttl(datetime.timedelta(hours=24))
|
||||
)
|
||||
logger.info("JWT: identity=%s room=%s", identity, lk_room)
|
||||
return token.to_jwt()
|
||||
|
||||
|
||||
KDF_HKDF = 1
|
||||
|
||||
|
||||
def _build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions:
|
||||
"""Build HKDF E2EE options matching Element Call's key derivation."""
|
||||
key_opts = rtc.KeyProviderOptions(
|
||||
shared_key=shared_key,
|
||||
ratchet_window_size=0,
|
||||
ratchet_salt=b"LKFrameEncryptionKey",
|
||||
failure_tolerance=-1,
|
||||
key_ring_size=16,
|
||||
key_derivation_function=KDF_HKDF,
|
||||
)
|
||||
return rtc.E2EEOptions(key_provider_options=key_opts)
|
||||
|
||||
|
||||
class VoiceSession:
|
||||
def __init__(self, nio_client, room_id, device_id, lk_url, model="claude-sonnet"):
|
||||
self.nio_client = nio_client
|
||||
self.room_id = room_id
|
||||
self.device_id = device_id
|
||||
self.lk_url = lk_url
|
||||
self.model = model
|
||||
self.lk_room = None
|
||||
self.session = None
|
||||
self._task = None
|
||||
self._http_session = None
|
||||
self._e2ee_key: bytes | None = None
|
||||
|
||||
def on_encryption_key(self, sender, device_id, key, index):
|
||||
"""Receive E2EE key from Element Call participant."""
|
||||
if key and not self._e2ee_key:
|
||||
self._e2ee_key = key
|
||||
logger.info("E2EE key received from %s:%s (index=%d, %d bytes)",
|
||||
sender, device_id, index, len(key))
|
||||
|
||||
async def start(self):
|
||||
self._task = asyncio.create_task(self._run())
|
||||
|
||||
async def stop(self):
|
||||
for obj in [self.session, self.lk_room, self._http_session]:
|
||||
if obj:
|
||||
try:
|
||||
if hasattr(obj, "aclose"):
|
||||
await obj.aclose()
|
||||
elif hasattr(obj, "disconnect"):
|
||||
await obj.disconnect()
|
||||
elif hasattr(obj, "close"):
|
||||
await obj.close()
|
||||
except Exception:
|
||||
pass
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def _run(self):
|
||||
try:
|
||||
user_id = self.nio_client.user_id
|
||||
jwt = _generate_lk_jwt(self.room_id, user_id, self.device_id)
|
||||
|
||||
# Wait up to 3s for E2EE encryption key from Element Call
|
||||
for _ in range(30):
|
||||
if self._e2ee_key:
|
||||
break
|
||||
await asyncio.sleep(0.1)
|
||||
if not self._e2ee_key:
|
||||
logger.warning("No E2EE key received after 3s, connecting without encryption")
|
||||
|
||||
# Connect with E2EE if key available
|
||||
e2ee_opts = None
|
||||
if self._e2ee_key:
|
||||
e2ee_opts = _build_e2ee_options(self._e2ee_key)
|
||||
logger.info("E2EE enabled with HKDF (%d byte key)", len(self._e2ee_key))
|
||||
|
||||
room_opts = rtc.RoomOptions(e2ee=e2ee_opts)
|
||||
self.lk_room = rtc.Room()
|
||||
|
||||
@self.lk_room.on("participant_connected")
|
||||
def on_p(p):
|
||||
logger.info("Participant connected: %s", p.identity)
|
||||
|
||||
@self.lk_room.on("track_published")
|
||||
def on_tp(pub, p):
|
||||
logger.info("Track pub: %s %s kind=%s", p.identity, pub.sid, pub.kind)
|
||||
|
||||
@self.lk_room.on("track_subscribed")
|
||||
def on_ts(t, pub, p):
|
||||
logger.info("Track sub: %s %s kind=%s", p.identity, pub.sid, t.kind)
|
||||
|
||||
await self.lk_room.connect(self.lk_url, jwt, options=room_opts)
|
||||
logger.info("Connected (E2EE=%s), remote=%d",
|
||||
"HKDF" if self._e2ee_key else "off",
|
||||
len(self.lk_room.remote_participants))
|
||||
|
||||
# Voice pipeline — German male voice (Daniel)
|
||||
self._http_session = aiohttp.ClientSession()
|
||||
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", DEFAULT_VOICE_ID)
|
||||
self.session = AgentSession(
|
||||
stt=elevenlabs.STT(api_key=ELEVENLABS_KEY, http_session=self._http_session),
|
||||
llm=lk_openai.LLM(base_url=LITELLM_URL, api_key=LITELLM_KEY, model=self.model),
|
||||
tts=elevenlabs.TTS(voice_id=voice_id, model="eleven_multilingual_v2",
|
||||
api_key=ELEVENLABS_KEY, http_session=self._http_session),
|
||||
vad=_get_vad(),
|
||||
)
|
||||
agent = Agent(instructions=VOICE_PROMPT)
|
||||
await self.session.start(agent=agent, room=self.lk_room)
|
||||
logger.info("Voice pipeline started (voice=%s)", voice_id)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self.session.generate_reply(
|
||||
instructions="Begruesse den Nutzer kurz auf Deutsch."),
|
||||
timeout=30.0)
|
||||
logger.info("Greeting sent")
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("Greeting timed out")
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Session cancelled for %s", self.room_id)
|
||||
except Exception:
|
||||
logger.exception("Session failed for %s", self.room_id)
|
||||
Reference in New Issue
Block a user