import os import asyncio import logging from livekit import rtc from livekit.api import AccessToken, VideoGrants from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli, room_io from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-agent") logging.basicConfig(level=logging.INFO) LITELLM_URL = os.environ["LITELLM_BASE_URL"] LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") LK_API_KEY = os.environ["LIVEKIT_API_KEY"] LK_API_SECRET = os.environ["LIVEKIT_API_SECRET"] LK_URL = os.environ["LIVEKIT_URL"] BOT_IDENTITY = os.environ.get("BOT_IDENTITY", "@ai:agiliton.eu:AIBOT") SYSTEM_PROMPT = """You are a helpful voice assistant in a Matrix call. Rules: - Keep answers SHORT — 1-3 sentences max - Be direct, no filler words - If the user wants more detail, they will ask - Speak naturally as in a conversation""" server = AgentServer() def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() server.setup_fnc = prewarm @server.rtc_session() async def entrypoint(ctx: JobContext): room_name = ctx.job.room.name logger.info("Job received for room %s, connecting with Matrix identity...", room_name) # Generate a token with the correct Matrix user identity token = ( AccessToken(LK_API_KEY, LK_API_SECRET) .with_identity(BOT_IDENTITY) .with_grants(VideoGrants( room=room_name, room_join=True, can_publish=True, can_subscribe=True, )) .to_jwt() ) # Connect our own room with the Matrix identity (NOT ctx.connect()) custom_room = rtc.Room() await custom_room.connect(LK_URL, token) logger.info("Connected to room as %s", BOT_IDENTITY) # Wait for a real (non-agent) participant def has_real_participant(): return any( not p.identity.startswith("agent-") for p in custom_room.remote_participants.values() ) if not has_real_participant(): logger.info("Waiting for real participant...") fut = asyncio.get_event_loop().create_future() def on_participant(p: rtc.RemoteParticipant): if not p.identity.startswith("agent-") and not fut.done(): fut.set_result(p) custom_room.on("participant_connected", on_participant) await fut logger.info("Participants: %s", list(custom_room.remote_participants.keys())) model = os.environ.get("LITELLM_MODEL", "claude-sonnet") voice_id = os.environ.get("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") session = AgentSession( stt=elevenlabs.STT(), llm=lk_openai.LLM( base_url=LITELLM_URL, api_key=LITELLM_KEY, model=model, ), tts=elevenlabs.TTS( voice_id=voice_id, model="eleven_turbo_v2_5", ), vad=ctx.proc.userdata["vad"], ) # Find the real user's identity for targeted audio input real_identity = next( (p.identity for p in custom_room.remote_participants.values() if not p.identity.startswith("agent-")), None, ) logger.info("Starting agent session, targeting participant: %s", real_identity) agent = Agent(instructions=SYSTEM_PROMPT) input_opts = room_io.RoomInputOptions( participant_identity=real_identity, ) await session.start( agent=agent, room=custom_room, room_input_options=input_opts, ) logger.info("Session started, generating greeting...") await session.generate_reply(instructions="Greet the user briefly.") logger.info("Greeting generated.") # Wait for all real participants to leave, then disconnect cleanly # so the LiveKit room gets deleted and auto-dispatch fires on next call left_fut = asyncio.get_event_loop().create_future() def on_participant_left(p: rtc.RemoteParticipant): # Check if any real (non-agent) participants remain remaining = [ pid for pid in custom_room.remote_participants if not pid.startswith("agent-") ] if not remaining and not left_fut.done(): left_fut.set_result(True) custom_room.on("participant_disconnected", on_participant_left) await left_fut logger.info("All participants left, disconnecting custom room...") await custom_room.disconnect() logger.info("Room disconnected, agent exiting.") if __name__ == "__main__": cli.run_app(server)