feat(e2ee): Add HKDF E2EE support for Element Call compatibility

Element Call uses HKDF-SHA256 + AES-128-GCM for frame encryption,
while the LiveKit Rust SDK defaults to PBKDF2 + AES-256-GCM.

- Multi-stage Dockerfile builds patched Rust FFI from EC-compat fork
- Generates Python protobuf bindings with new fields
- patch_sdk.py modifies installed livekit-rtc for new proto fields
- agent.py passes E2EE options with HKDF to ctx.connect()
- bot.py exchanges encryption keys via Matrix state events
- Separate Dockerfile.bot for bot service (no Rust build needed)

Ref: livekit/rust-sdks#904, livekit/python-sdks#570

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-02-20 16:28:56 +02:00
parent 578b6bb56f
commit fc3d915939
7 changed files with 309 additions and 8 deletions

View File

@@ -1,6 +1,65 @@
FROM python:3.11-slim # Stage 1: Build patched Rust FFI with HKDF support for Element Call E2EE
# Fork: onestacked/livekit-rust-sdks branch EC-compat-changes
# PR: https://github.com/livekit/rust-sdks/pull/904
FROM rust:1.82-slim-bookworm AS rust-build
RUN apt-get update && apt-get install -y --no-install-recommends \
git cmake g++ libssl-dev pkg-config protobuf-compiler \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /build
RUN git clone --branch EC-compat-changes --depth 1 \
https://github.com/onestacked/livekit-rust-sdks.git
WORKDIR /build/livekit-rust-sdks/livekit-ffi
RUN cargo build --release
# Stage 2: Generate Python protobuf bindings from patched .proto files
FROM python:3.11-slim-bookworm AS proto-gen
RUN pip install --no-cache-dir protobuf grpcio-tools mypy-protobuf
COPY --from=rust-build /build/livekit-rust-sdks/livekit-ffi/protocol/ /proto/
RUN mkdir -p /gen && \
python -m grpc_tools.protoc \
-I/proto \
--python_out=/gen \
--mypy_out=/gen \
/proto/audio_frame.proto \
/proto/ffi.proto \
/proto/handle.proto \
/proto/participant.proto \
/proto/room.proto \
/proto/track.proto \
/proto/video_frame.proto \
/proto/e2ee.proto \
/proto/stats.proto \
/proto/track_publication.proto \
/proto/rpc.proto \
/proto/data_stream.proto && \
touch /gen/__init__.py && \
# Fix imports to be relative (same as upstream generate_proto.sh)
for f in /gen/*.py /gen/*.pyi; do \
perl -i -pe 's|^(import (audio_frame_pb2\|ffi_pb2\|handle_pb2\|participant_pb2\|room_pb2\|track_pb2\|video_frame_pb2\|e2ee_pb2\|stats_pb2\|rpc_pb2\|track_publication_pb2\|data_stream_pb2))|from . \1|g' "$f"; \
done
# Stage 3: Final image
FROM python:3.11-slim-bookworm
WORKDIR /app WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg libolm-dev && rm -rf /var/lib/apt/lists/* RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg libolm-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Overwrite installed FFI binary with patched version (HKDF + key_ring_size support)
COPY --from=rust-build /build/livekit-rust-sdks/target/release/liblivekit_ffi.so /patched/
ENV LIVEKIT_LIB_PATH=/patched/liblivekit_ffi.so
# Overwrite installed proto bindings with patched versions (new fields: key_ring_size, key_derivation_function)
COPY --from=proto-gen /gen/ /patched_proto/
RUN PROTO_DIR=$(python -c "import livekit.rtc._proto; import os; print(os.path.dirname(livekit.rtc._proto.__file__))") && \
cp /patched_proto/*.py "$PROTO_DIR/" && \
cp /patched_proto/*.pyi "$PROTO_DIR/" 2>/dev/null || true
# Patch SDK Python code to pass new fields through to proto (e2ee.py + room.py)
COPY patch_sdk.py /tmp/patch_sdk.py
RUN python /tmp/patch_sdk.py && rm /tmp/patch_sdk.py
COPY . . COPY . .

6
Dockerfile.bot Normal file
View File

@@ -0,0 +1,6 @@
FROM python:3.11-slim-bookworm
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg libolm-dev && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

View File

@@ -1,8 +1,13 @@
import os import os
import json
import base64
import logging import logging
from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli
from livekit.plugins import openai as lk_openai, elevenlabs, silero from livekit.plugins import openai as lk_openai, elevenlabs, silero
import livekit.rtc as rtc
from e2ee_patch import KDF_HKDF
logger = logging.getLogger("matrix-ai-agent") logger = logging.getLogger("matrix-ai-agent")
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
@@ -27,12 +32,63 @@ def prewarm(proc: JobProcess):
server.setup_fnc = prewarm server.setup_fnc = prewarm
def build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions:
"""Build E2EE options with HKDF key derivation (Element Call compatible).
Uses patched KeyProviderOptions with key_ring_size and key_derivation_function
fields added by patch_sdk.py during Docker build.
"""
key_opts = rtc.KeyProviderOptions(
shared_key=shared_key,
ratchet_window_size=0,
ratchet_salt=b"LKFrameEncryptionKey",
failure_tolerance=-1,
key_ring_size=16,
key_derivation_function=KDF_HKDF,
)
return rtc.E2EEOptions(key_provider_options=key_opts)
def get_e2ee_key(ctx: JobContext) -> bytes | None:
"""Extract E2EE shared key from dispatch metadata or environment."""
# Try dispatch metadata first (set by bot.py)
metadata_str = getattr(ctx.job, "metadata", None) or ""
if metadata_str:
try:
meta = json.loads(metadata_str)
key_b64 = meta.get("e2ee_key")
if key_b64:
key = base64.b64decode(key_b64)
logger.info("E2EE key from dispatch metadata (%d bytes)", len(key))
return key
except (json.JSONDecodeError, Exception) as e:
logger.warning("Failed to parse dispatch metadata for E2EE key: %s", e)
# Fallback: environment variable (for testing)
env_key = os.environ.get("E2EE_SHARED_KEY")
if env_key:
key = base64.b64decode(env_key) if len(env_key) > 32 else env_key.encode()
logger.info("E2EE key from environment (%d bytes)", len(key))
return key
return None
@server.rtc_session(agent_name=os.environ.get("AGENT_NAME", "matrix-ai")) @server.rtc_session(agent_name=os.environ.get("AGENT_NAME", "matrix-ai"))
async def entrypoint(ctx: JobContext): async def entrypoint(ctx: JobContext):
logger.info("Job received for room %s", ctx.job.room.name) logger.info("Job received for room %s", ctx.job.room.name)
# Standard framework connection (handles audio pipeline properly) # Check for E2EE key
await ctx.connect() e2ee_key = get_e2ee_key(ctx)
e2ee_opts = None
if e2ee_key:
e2ee_opts = build_e2ee_options(e2ee_key)
logger.info("E2EE enabled with HKDF key derivation")
else:
logger.info("E2EE disabled (no key provided)")
# Connect to room with optional E2EE
await ctx.connect(e2ee=e2ee_opts)
logger.info("Connected to room, local identity: %s", ctx.room.local_participant.identity) logger.info("Connected to room, local identity: %s", ctx.room.local_participant.identity)
logger.info("Remote participants: %s", list(ctx.room.remote_participants.keys())) logger.info("Remote participants: %s", list(ctx.room.remote_participants.keys()))
@@ -53,7 +109,6 @@ async def entrypoint(ctx: JobContext):
vad=ctx.proc.userdata["vad"], vad=ctx.proc.userdata["vad"],
) )
# Debug: log pipeline events
@session.on("user_speech_committed") @session.on("user_speech_committed")
def on_speech(msg): def on_speech(msg):
logger.info("USER_SPEECH_COMMITTED: %s", msg.text_content) logger.info("USER_SPEECH_COMMITTED: %s", msg.text_content)

53
bot.py
View File

@@ -38,6 +38,7 @@ from livekit import api
BOT_DEVICE_ID = "AIBOT" BOT_DEVICE_ID = "AIBOT"
CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member" CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member"
ENCRYPTION_KEYS_TYPE = "io.element.call.encryption_keys"
MODEL_STATE_TYPE = "ai.agiliton.model" MODEL_STATE_TYPE = "ai.agiliton.model"
RENAME_STATE_TYPE = "ai.agiliton.auto_rename" RENAME_STATE_TYPE = "ai.agiliton.auto_rename"
@@ -398,14 +399,27 @@ class Bot:
if room_id not in self.dispatched_rooms: if room_id not in self.dispatched_rooms:
try: try:
# Collect E2EE encryption keys from room state
e2ee_key = await self._get_call_encryption_key(room_id, event.sender)
dispatch_metadata = ""
if e2ee_key:
# Generate agent's own key and publish it
agent_key = os.urandom(32)
await self._publish_encryption_key(room_id, agent_key)
dispatch_metadata = json.dumps({
"e2ee_key": base64.b64encode(agent_key).decode(),
})
logger.info("E2EE key prepared for agent dispatch")
await self.lkapi.agent_dispatch.create_dispatch( await self.lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest( api.CreateAgentDispatchRequest(
agent_name=AGENT_NAME, agent_name=AGENT_NAME,
room=lk_room_name, room=lk_room_name,
metadata=dispatch_metadata,
) )
) )
self.dispatched_rooms.add(room_id) self.dispatched_rooms.add(room_id)
logger.info("Agent dispatched to LiveKit room %s", lk_room_name) logger.info("Agent dispatched to LiveKit room %s (e2ee=%s)", lk_room_name, bool(e2ee_key))
except Exception: except Exception:
logger.exception("Dispatch failed for %s", lk_room_name) logger.exception("Dispatch failed for %s", lk_room_name)
@@ -1395,6 +1409,43 @@ class Bot:
}, },
) )
async def _get_call_encryption_key(self, room_id: str, sender: str) -> bytes | None:
"""Read E2EE encryption key from io.element.call.encryption_keys state events."""
try:
resp = await self.client.room_get_state_event(
room_id, ENCRYPTION_KEYS_TYPE, sender,
)
if hasattr(resp, "content") and resp.content:
keys = resp.content.get("keys", [])
if keys:
key_b64 = keys[0].get("key", "")
if key_b64:
# Element Call uses base64url encoding
key_b64 += "=" * (-len(key_b64) % 4) # pad
key = base64.urlsafe_b64decode(key_b64)
logger.info("Got E2EE key from %s (%d bytes)", sender, len(key))
return key
except Exception as e:
logger.debug("No encryption key from %s in %s: %s", sender, room_id, e)
return None
async def _publish_encryption_key(self, room_id: str, key: bytes):
"""Publish bot's E2EE encryption key as io.element.call.encryption_keys state event."""
key_b64 = base64.urlsafe_b64encode(key).decode().rstrip("=")
content = {
"call_id": "",
"device_id": BOT_DEVICE_ID,
"keys": [{"index": 0, "key": key_b64}],
}
state_key = f"{BOT_USER}:{BOT_DEVICE_ID}"
try:
await self.client.room_put_state(
room_id, ENCRYPTION_KEYS_TYPE, content, state_key=state_key,
)
logger.info("Published E2EE key in %s", room_id)
except Exception:
logger.exception("Failed to publish E2EE key in %s", room_id)
async def _route_verification(self, room, event: UnknownEvent): async def _route_verification(self, room, event: UnknownEvent):
"""Route in-room verification events from UnknownEvent.""" """Route in-room verification events from UnknownEvent."""
source = event.source or {} source = event.source or {}

View File

@@ -1,13 +1,17 @@
services: services:
agent: agent:
build: . build:
context: .
dockerfile: Dockerfile
command: python agent.py start command: python agent.py start
env_file: .env env_file: .env
restart: unless-stopped restart: unless-stopped
network_mode: host network_mode: host
bot: bot:
build: . build:
context: .
dockerfile: Dockerfile.bot
command: python bot.py command: python bot.py
env_file: .env env_file: .env
restart: unless-stopped restart: unless-stopped

14
e2ee_patch.py Normal file
View File

@@ -0,0 +1,14 @@
"""
E2EE HKDF constants and helpers for Element Call compatibility.
The patched SDK (via patch_sdk.py + patched FFI binary) adds:
- key_ring_size (int, field 5 in proto)
- key_derivation_function (int, field 6: 0=PBKDF2, 1=HKDF)
This module provides constants and a convenience function for building
HKDF-compatible E2EE options.
"""
# Key derivation function constants matching proto enum KeyDerivationFunction
KDF_PBKDF2 = 0
KDF_HKDF = 1

112
patch_sdk.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Patch the installed livekit-rtc SDK to support HKDF E2EE fields.
Run after pip install in the Docker build. Adds key_ring_size and
key_derivation_function fields to:
1. KeyProviderOptions dataclass (e2ee.py)
2. Proto conversion in Room.connect() (room.py)
These fields are added by the EC-compat Rust fork and are required
for Element Call E2EE compatibility.
"""
import os
import sys
def get_package_dir():
"""Find the installed livekit.rtc package directory."""
import livekit.rtc as rtc
return os.path.dirname(rtc.__file__)
def patch_e2ee(pkg_dir: str):
"""Add key_ring_size and key_derivation_function to KeyProviderOptions."""
e2ee_path = os.path.join(pkg_dir, "e2ee.py")
with open(e2ee_path) as f:
content = f.read()
if "key_ring_size" in content:
print("e2ee.py already patched, skipping")
return
# Add new fields after failure_tolerance
content = content.replace(
"failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE",
"failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE\n"
" key_ring_size: int = 16\n"
" key_derivation_function: int = 0 # 0=PBKDF2, 1=HKDF",
)
with open(e2ee_path, "w") as f:
f.write(content)
print(f"Patched {e2ee_path}: added key_ring_size, key_derivation_function")
def patch_room(pkg_dir: str):
"""Add key_ring_size and key_derivation_function to Room.connect() proto conversion."""
room_path = os.path.join(pkg_dir, "room.py")
with open(room_path) as f:
content = f.read()
if "key_ring_size" in content:
print("room.py already patched, skipping")
return
# Patch the deprecated e2ee path (used by livekit-agents)
old_e2ee = (
"req.connect.options.e2ee.key_provider_options.ratchet_window_size = (\n"
" options.e2ee.key_provider_options.ratchet_window_size\n"
" )"
)
new_e2ee = (
"req.connect.options.e2ee.key_provider_options.ratchet_window_size = (\n"
" options.e2ee.key_provider_options.ratchet_window_size\n"
" )\n"
" if hasattr(options.e2ee.key_provider_options, 'key_ring_size'):\n"
" req.connect.options.e2ee.key_provider_options.key_ring_size = (\n"
" options.e2ee.key_provider_options.key_ring_size\n"
" )\n"
" if hasattr(options.e2ee.key_provider_options, 'key_derivation_function'):\n"
" req.connect.options.e2ee.key_provider_options.key_derivation_function = (\n"
" options.e2ee.key_provider_options.key_derivation_function\n"
" )"
)
content = content.replace(old_e2ee, new_e2ee)
# Patch the current encryption path too
old_enc = (
"req.connect.options.encryption.key_provider_options.ratchet_window_size = (\n"
" options.encryption.key_provider_options.ratchet_window_size\n"
" )"
)
new_enc = (
"req.connect.options.encryption.key_provider_options.ratchet_window_size = (\n"
" options.encryption.key_provider_options.ratchet_window_size\n"
" )\n"
" if hasattr(options.encryption.key_provider_options, 'key_ring_size'):\n"
" req.connect.options.encryption.key_provider_options.key_ring_size = (\n"
" options.encryption.key_provider_options.key_ring_size\n"
" )\n"
" if hasattr(options.encryption.key_provider_options, 'key_derivation_function'):\n"
" req.connect.options.encryption.key_provider_options.key_derivation_function = (\n"
" options.encryption.key_provider_options.key_derivation_function\n"
" )"
)
content = content.replace(old_enc, new_enc)
with open(room_path, "w") as f:
f.write(content)
print(f"Patched {room_path}: added key_ring_size, key_derivation_function to proto conversion")
def main():
pkg_dir = get_package_dir()
print(f"Patching livekit-rtc at {pkg_dir}")
patch_e2ee(pkg_dir)
patch_room(pkg_dir)
print("SDK patching complete")
if __name__ == "__main__":
main()