diff --git a/Dockerfile b/Dockerfile index 53f704c..9045ccc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,65 @@ -FROM python:3.11-slim +# Stage 1: Build patched Rust FFI with HKDF support for Element Call E2EE +# Fork: onestacked/livekit-rust-sdks branch EC-compat-changes +# PR: https://github.com/livekit/rust-sdks/pull/904 +FROM rust:1.82-slim-bookworm AS rust-build +RUN apt-get update && apt-get install -y --no-install-recommends \ + git cmake g++ libssl-dev pkg-config protobuf-compiler \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /build +RUN git clone --branch EC-compat-changes --depth 1 \ + https://github.com/onestacked/livekit-rust-sdks.git +WORKDIR /build/livekit-rust-sdks/livekit-ffi +RUN cargo build --release + +# Stage 2: Generate Python protobuf bindings from patched .proto files +FROM python:3.11-slim-bookworm AS proto-gen +RUN pip install --no-cache-dir protobuf grpcio-tools mypy-protobuf +COPY --from=rust-build /build/livekit-rust-sdks/livekit-ffi/protocol/ /proto/ +RUN mkdir -p /gen && \ + python -m grpc_tools.protoc \ + -I/proto \ + --python_out=/gen \ + --mypy_out=/gen \ + /proto/audio_frame.proto \ + /proto/ffi.proto \ + /proto/handle.proto \ + /proto/participant.proto \ + /proto/room.proto \ + /proto/track.proto \ + /proto/video_frame.proto \ + /proto/e2ee.proto \ + /proto/stats.proto \ + /proto/track_publication.proto \ + /proto/rpc.proto \ + /proto/data_stream.proto && \ + touch /gen/__init__.py && \ + # Fix imports to be relative (same as upstream generate_proto.sh) + for f in /gen/*.py /gen/*.pyi; do \ + perl -i -pe 's|^(import (audio_frame_pb2\|ffi_pb2\|handle_pb2\|participant_pb2\|room_pb2\|track_pb2\|video_frame_pb2\|e2ee_pb2\|stats_pb2\|rpc_pb2\|track_publication_pb2\|data_stream_pb2))|from . \1|g' "$f"; \ + done + +# Stage 3: Final image +FROM python:3.11-slim-bookworm WORKDIR /app -RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg libolm-dev && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y --no-install-recommends \ + ffmpeg libolm-dev \ + && rm -rf /var/lib/apt/lists/* + COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt + +# Overwrite installed FFI binary with patched version (HKDF + key_ring_size support) +COPY --from=rust-build /build/livekit-rust-sdks/target/release/liblivekit_ffi.so /patched/ +ENV LIVEKIT_LIB_PATH=/patched/liblivekit_ffi.so + +# Overwrite installed proto bindings with patched versions (new fields: key_ring_size, key_derivation_function) +COPY --from=proto-gen /gen/ /patched_proto/ +RUN PROTO_DIR=$(python -c "import livekit.rtc._proto; import os; print(os.path.dirname(livekit.rtc._proto.__file__))") && \ + cp /patched_proto/*.py "$PROTO_DIR/" && \ + cp /patched_proto/*.pyi "$PROTO_DIR/" 2>/dev/null || true + +# Patch SDK Python code to pass new fields through to proto (e2ee.py + room.py) +COPY patch_sdk.py /tmp/patch_sdk.py +RUN python /tmp/patch_sdk.py && rm /tmp/patch_sdk.py + COPY . . diff --git a/Dockerfile.bot b/Dockerfile.bot new file mode 100644 index 0000000..b8bc728 --- /dev/null +++ b/Dockerfile.bot @@ -0,0 +1,6 @@ +FROM python:3.11-slim-bookworm +WORKDIR /app +RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg libolm-dev && rm -rf /var/lib/apt/lists/* +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . diff --git a/agent.py b/agent.py index e7b9681..0439ff2 100644 --- a/agent.py +++ b/agent.py @@ -1,8 +1,13 @@ import os +import json +import base64 import logging from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli from livekit.plugins import openai as lk_openai, elevenlabs, silero +import livekit.rtc as rtc + +from e2ee_patch import KDF_HKDF logger = logging.getLogger("matrix-ai-agent") logging.basicConfig(level=logging.DEBUG) @@ -27,12 +32,63 @@ def prewarm(proc: JobProcess): server.setup_fnc = prewarm +def build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions: + """Build E2EE options with HKDF key derivation (Element Call compatible). + + Uses patched KeyProviderOptions with key_ring_size and key_derivation_function + fields added by patch_sdk.py during Docker build. + """ + key_opts = rtc.KeyProviderOptions( + shared_key=shared_key, + ratchet_window_size=0, + ratchet_salt=b"LKFrameEncryptionKey", + failure_tolerance=-1, + key_ring_size=16, + key_derivation_function=KDF_HKDF, + ) + return rtc.E2EEOptions(key_provider_options=key_opts) + + +def get_e2ee_key(ctx: JobContext) -> bytes | None: + """Extract E2EE shared key from dispatch metadata or environment.""" + # Try dispatch metadata first (set by bot.py) + metadata_str = getattr(ctx.job, "metadata", None) or "" + if metadata_str: + try: + meta = json.loads(metadata_str) + key_b64 = meta.get("e2ee_key") + if key_b64: + key = base64.b64decode(key_b64) + logger.info("E2EE key from dispatch metadata (%d bytes)", len(key)) + return key + except (json.JSONDecodeError, Exception) as e: + logger.warning("Failed to parse dispatch metadata for E2EE key: %s", e) + + # Fallback: environment variable (for testing) + env_key = os.environ.get("E2EE_SHARED_KEY") + if env_key: + key = base64.b64decode(env_key) if len(env_key) > 32 else env_key.encode() + logger.info("E2EE key from environment (%d bytes)", len(key)) + return key + + return None + + @server.rtc_session(agent_name=os.environ.get("AGENT_NAME", "matrix-ai")) async def entrypoint(ctx: JobContext): logger.info("Job received for room %s", ctx.job.room.name) - # Standard framework connection (handles audio pipeline properly) - await ctx.connect() + # Check for E2EE key + e2ee_key = get_e2ee_key(ctx) + e2ee_opts = None + if e2ee_key: + e2ee_opts = build_e2ee_options(e2ee_key) + logger.info("E2EE enabled with HKDF key derivation") + else: + logger.info("E2EE disabled (no key provided)") + + # Connect to room with optional E2EE + await ctx.connect(e2ee=e2ee_opts) logger.info("Connected to room, local identity: %s", ctx.room.local_participant.identity) logger.info("Remote participants: %s", list(ctx.room.remote_participants.keys())) @@ -53,7 +109,6 @@ async def entrypoint(ctx: JobContext): vad=ctx.proc.userdata["vad"], ) - # Debug: log pipeline events @session.on("user_speech_committed") def on_speech(msg): logger.info("USER_SPEECH_COMMITTED: %s", msg.text_content) diff --git a/bot.py b/bot.py index c100163..b5852cf 100644 --- a/bot.py +++ b/bot.py @@ -38,6 +38,7 @@ from livekit import api BOT_DEVICE_ID = "AIBOT" CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member" +ENCRYPTION_KEYS_TYPE = "io.element.call.encryption_keys" MODEL_STATE_TYPE = "ai.agiliton.model" RENAME_STATE_TYPE = "ai.agiliton.auto_rename" @@ -398,14 +399,27 @@ class Bot: if room_id not in self.dispatched_rooms: try: + # Collect E2EE encryption keys from room state + e2ee_key = await self._get_call_encryption_key(room_id, event.sender) + dispatch_metadata = "" + if e2ee_key: + # Generate agent's own key and publish it + agent_key = os.urandom(32) + await self._publish_encryption_key(room_id, agent_key) + dispatch_metadata = json.dumps({ + "e2ee_key": base64.b64encode(agent_key).decode(), + }) + logger.info("E2EE key prepared for agent dispatch") + await self.lkapi.agent_dispatch.create_dispatch( api.CreateAgentDispatchRequest( agent_name=AGENT_NAME, room=lk_room_name, + metadata=dispatch_metadata, ) ) self.dispatched_rooms.add(room_id) - logger.info("Agent dispatched to LiveKit room %s", lk_room_name) + logger.info("Agent dispatched to LiveKit room %s (e2ee=%s)", lk_room_name, bool(e2ee_key)) except Exception: logger.exception("Dispatch failed for %s", lk_room_name) @@ -1395,6 +1409,43 @@ class Bot: }, ) + async def _get_call_encryption_key(self, room_id: str, sender: str) -> bytes | None: + """Read E2EE encryption key from io.element.call.encryption_keys state events.""" + try: + resp = await self.client.room_get_state_event( + room_id, ENCRYPTION_KEYS_TYPE, sender, + ) + if hasattr(resp, "content") and resp.content: + keys = resp.content.get("keys", []) + if keys: + key_b64 = keys[0].get("key", "") + if key_b64: + # Element Call uses base64url encoding + key_b64 += "=" * (-len(key_b64) % 4) # pad + key = base64.urlsafe_b64decode(key_b64) + logger.info("Got E2EE key from %s (%d bytes)", sender, len(key)) + return key + except Exception as e: + logger.debug("No encryption key from %s in %s: %s", sender, room_id, e) + return None + + async def _publish_encryption_key(self, room_id: str, key: bytes): + """Publish bot's E2EE encryption key as io.element.call.encryption_keys state event.""" + key_b64 = base64.urlsafe_b64encode(key).decode().rstrip("=") + content = { + "call_id": "", + "device_id": BOT_DEVICE_ID, + "keys": [{"index": 0, "key": key_b64}], + } + state_key = f"{BOT_USER}:{BOT_DEVICE_ID}" + try: + await self.client.room_put_state( + room_id, ENCRYPTION_KEYS_TYPE, content, state_key=state_key, + ) + logger.info("Published E2EE key in %s", room_id) + except Exception: + logger.exception("Failed to publish E2EE key in %s", room_id) + async def _route_verification(self, room, event: UnknownEvent): """Route in-room verification events from UnknownEvent.""" source = event.source or {} diff --git a/docker-compose.yml b/docker-compose.yml index 1f8d7c0..8ffae8f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,13 +1,17 @@ services: agent: - build: . + build: + context: . + dockerfile: Dockerfile command: python agent.py start env_file: .env restart: unless-stopped network_mode: host bot: - build: . + build: + context: . + dockerfile: Dockerfile.bot command: python bot.py env_file: .env restart: unless-stopped diff --git a/e2ee_patch.py b/e2ee_patch.py new file mode 100644 index 0000000..2f0dd2c --- /dev/null +++ b/e2ee_patch.py @@ -0,0 +1,14 @@ +""" +E2EE HKDF constants and helpers for Element Call compatibility. + +The patched SDK (via patch_sdk.py + patched FFI binary) adds: + - key_ring_size (int, field 5 in proto) + - key_derivation_function (int, field 6: 0=PBKDF2, 1=HKDF) + +This module provides constants and a convenience function for building +HKDF-compatible E2EE options. +""" + +# Key derivation function constants matching proto enum KeyDerivationFunction +KDF_PBKDF2 = 0 +KDF_HKDF = 1 diff --git a/patch_sdk.py b/patch_sdk.py new file mode 100644 index 0000000..893a302 --- /dev/null +++ b/patch_sdk.py @@ -0,0 +1,112 @@ +""" +Patch the installed livekit-rtc SDK to support HKDF E2EE fields. + +Run after pip install in the Docker build. Adds key_ring_size and +key_derivation_function fields to: +1. KeyProviderOptions dataclass (e2ee.py) +2. Proto conversion in Room.connect() (room.py) + +These fields are added by the EC-compat Rust fork and are required +for Element Call E2EE compatibility. +""" + +import os +import sys + + +def get_package_dir(): + """Find the installed livekit.rtc package directory.""" + import livekit.rtc as rtc + return os.path.dirname(rtc.__file__) + + +def patch_e2ee(pkg_dir: str): + """Add key_ring_size and key_derivation_function to KeyProviderOptions.""" + e2ee_path = os.path.join(pkg_dir, "e2ee.py") + with open(e2ee_path) as f: + content = f.read() + + if "key_ring_size" in content: + print("e2ee.py already patched, skipping") + return + + # Add new fields after failure_tolerance + content = content.replace( + "failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE", + "failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE\n" + " key_ring_size: int = 16\n" + " key_derivation_function: int = 0 # 0=PBKDF2, 1=HKDF", + ) + + with open(e2ee_path, "w") as f: + f.write(content) + print(f"Patched {e2ee_path}: added key_ring_size, key_derivation_function") + + +def patch_room(pkg_dir: str): + """Add key_ring_size and key_derivation_function to Room.connect() proto conversion.""" + room_path = os.path.join(pkg_dir, "room.py") + with open(room_path) as f: + content = f.read() + + if "key_ring_size" in content: + print("room.py already patched, skipping") + return + + # Patch the deprecated e2ee path (used by livekit-agents) + old_e2ee = ( + "req.connect.options.e2ee.key_provider_options.ratchet_window_size = (\n" + " options.e2ee.key_provider_options.ratchet_window_size\n" + " )" + ) + new_e2ee = ( + "req.connect.options.e2ee.key_provider_options.ratchet_window_size = (\n" + " options.e2ee.key_provider_options.ratchet_window_size\n" + " )\n" + " if hasattr(options.e2ee.key_provider_options, 'key_ring_size'):\n" + " req.connect.options.e2ee.key_provider_options.key_ring_size = (\n" + " options.e2ee.key_provider_options.key_ring_size\n" + " )\n" + " if hasattr(options.e2ee.key_provider_options, 'key_derivation_function'):\n" + " req.connect.options.e2ee.key_provider_options.key_derivation_function = (\n" + " options.e2ee.key_provider_options.key_derivation_function\n" + " )" + ) + content = content.replace(old_e2ee, new_e2ee) + + # Patch the current encryption path too + old_enc = ( + "req.connect.options.encryption.key_provider_options.ratchet_window_size = (\n" + " options.encryption.key_provider_options.ratchet_window_size\n" + " )" + ) + new_enc = ( + "req.connect.options.encryption.key_provider_options.ratchet_window_size = (\n" + " options.encryption.key_provider_options.ratchet_window_size\n" + " )\n" + " if hasattr(options.encryption.key_provider_options, 'key_ring_size'):\n" + " req.connect.options.encryption.key_provider_options.key_ring_size = (\n" + " options.encryption.key_provider_options.key_ring_size\n" + " )\n" + " if hasattr(options.encryption.key_provider_options, 'key_derivation_function'):\n" + " req.connect.options.encryption.key_provider_options.key_derivation_function = (\n" + " options.encryption.key_provider_options.key_derivation_function\n" + " )" + ) + content = content.replace(old_enc, new_enc) + + with open(room_path, "w") as f: + f.write(content) + print(f"Patched {room_path}: added key_ring_size, key_derivation_function to proto conversion") + + +def main(): + pkg_dir = get_package_dir() + print(f"Patching livekit-rtc at {pkg_dir}") + patch_e2ee(pkg_dir) + patch_room(pkg_dir) + print("SDK patching complete") + + +if __name__ == "__main__": + main()