import os import json import base64 import logging from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli from livekit.plugins import openai as lk_openai, elevenlabs, silero import livekit.rtc as rtc from e2ee_patch import KDF_HKDF logger = logging.getLogger("matrix-ai-agent") logging.basicConfig(level=logging.DEBUG) LITELLM_URL = os.environ["LITELLM_BASE_URL"] LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") SYSTEM_PROMPT = """You are a helpful voice assistant in a Matrix call. Rules: - Keep answers SHORT — 1-3 sentences max - Be direct, no filler words - If the user wants more detail, they will ask - Speak naturally as in a conversation""" server = AgentServer() def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() server.setup_fnc = prewarm def build_e2ee_options(shared_key: bytes) -> rtc.E2EEOptions: """Build E2EE options with HKDF key derivation (Element Call compatible). Uses patched KeyProviderOptions with key_ring_size and key_derivation_function fields added by patch_sdk.py during Docker build. """ key_opts = rtc.KeyProviderOptions( shared_key=shared_key, ratchet_window_size=0, ratchet_salt=b"LKFrameEncryptionKey", failure_tolerance=-1, key_ring_size=16, key_derivation_function=KDF_HKDF, ) return rtc.E2EEOptions(key_provider_options=key_opts) def get_e2ee_key(ctx: JobContext) -> bytes | None: """Extract E2EE shared key from dispatch metadata or environment.""" # Try dispatch metadata first (set by bot.py) metadata_str = getattr(ctx.job, "metadata", None) or "" if metadata_str: try: meta = json.loads(metadata_str) key_b64 = meta.get("e2ee_key") if key_b64: key = base64.b64decode(key_b64) logger.info("E2EE key from dispatch metadata (%d bytes)", len(key)) return key except (json.JSONDecodeError, Exception) as e: logger.warning("Failed to parse dispatch metadata for E2EE key: %s", e) # Fallback: environment variable (for testing) env_key = os.environ.get("E2EE_SHARED_KEY") if env_key: key = base64.b64decode(env_key) if len(env_key) > 32 else env_key.encode() logger.info("E2EE key from environment (%d bytes)", len(key)) return key return None @server.rtc_session(agent_name=os.environ.get("AGENT_NAME", "matrix-ai")) async def entrypoint(ctx: JobContext): logger.info("Job received for room %s", ctx.job.room.name) # Check for E2EE key e2ee_key = get_e2ee_key(ctx) e2ee_opts = None if e2ee_key: e2ee_opts = build_e2ee_options(e2ee_key) logger.info("E2EE enabled with HKDF key derivation") else: logger.info("E2EE disabled (no key provided)") # Connect to room with optional E2EE await ctx.connect(e2ee=e2ee_opts) logger.info("Connected to room, local identity: %s", ctx.room.local_participant.identity) logger.info("Remote participants: %s", list(ctx.room.remote_participants.keys())) model = os.environ.get("LITELLM_MODEL", "claude-sonnet") voice_id = os.environ.get("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") session = AgentSession( stt=elevenlabs.STT(), llm=lk_openai.LLM( base_url=LITELLM_URL, api_key=LITELLM_KEY, model=model, ), tts=elevenlabs.TTS( voice_id=voice_id, model="eleven_turbo_v2_5", ), vad=ctx.proc.userdata["vad"], ) @session.on("user_speech_committed") def on_speech(msg): logger.info("USER_SPEECH_COMMITTED: %s", msg.text_content) @session.on("agent_speech_committed") def on_agent_speech(msg): logger.info("AGENT_SPEECH_COMMITTED: %s", msg.text_content) agent = Agent(instructions=SYSTEM_PROMPT) await session.start( agent=agent, room=ctx.room, ) logger.info("Session started, generating greeting...") await session.generate_reply(instructions="Greet the user briefly.") logger.info("Greeting generated.") if __name__ == "__main__": cli.run_app(server)