From ee4efd01efe524a280603a688d15575d903f5e68 Mon Sep 17 00:00:00 2001 From: Christian Gick Date: Sun, 15 Feb 2026 13:41:53 +0200 Subject: [PATCH] fix: agent cleanup on disconnect + targeted audio input - Agent disconnects custom room when all real participants leave (prevents zombie participants blocking auto-dispatch) - Bot sends m.call.member state event on call detection (Element Call shows bot as joined) - Use RoomInputOptions(participant_identity=...) to target real user audio input (framework agent-AJ_xxx participant was confusing RoomIO) - Removed incorrect bot dispatch (Matrix room ID != LiveKit room name) Co-Authored-By: Claude Opus 4.6 --- agent.py | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- bot.py | 69 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/agent.py b/agent.py index b894b27..684cee1 100644 --- a/agent.py +++ b/agent.py @@ -1,13 +1,21 @@ import os +import asyncio import logging -from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli +from livekit import rtc +from livekit.api import AccessToken, VideoGrants +from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli, room_io from livekit.plugins import openai as lk_openai, elevenlabs, silero logger = logging.getLogger("matrix-ai-agent") +logging.basicConfig(level=logging.INFO) LITELLM_URL = os.environ["LITELLM_BASE_URL"] LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed") +LK_API_KEY = os.environ["LIVEKIT_API_KEY"] +LK_API_SECRET = os.environ["LIVEKIT_API_SECRET"] +LK_URL = os.environ["LIVEKIT_URL"] +BOT_IDENTITY = os.environ.get("BOT_IDENTITY", "@ai:agiliton.eu:AIBOT") SYSTEM_PROMPT = """You are a helpful voice assistant in a Matrix call. Rules: @@ -26,8 +34,49 @@ def prewarm(proc: JobProcess): server.setup_fnc = prewarm -@server.rtc_session(agent_name="matrix-ai") +@server.rtc_session() async def entrypoint(ctx: JobContext): + room_name = ctx.job.room.name + logger.info("Job received for room %s, connecting with Matrix identity...", room_name) + + # Generate a token with the correct Matrix user identity + token = ( + AccessToken(LK_API_KEY, LK_API_SECRET) + .with_identity(BOT_IDENTITY) + .with_grants(VideoGrants( + room=room_name, + room_join=True, + can_publish=True, + can_subscribe=True, + )) + .to_jwt() + ) + + # Connect our own room with the Matrix identity (NOT ctx.connect()) + custom_room = rtc.Room() + await custom_room.connect(LK_URL, token) + logger.info("Connected to room as %s", BOT_IDENTITY) + + # Wait for a real (non-agent) participant + def has_real_participant(): + return any( + not p.identity.startswith("agent-") + for p in custom_room.remote_participants.values() + ) + + if not has_real_participant(): + logger.info("Waiting for real participant...") + fut = asyncio.get_event_loop().create_future() + + def on_participant(p: rtc.RemoteParticipant): + if not p.identity.startswith("agent-") and not fut.done(): + fut.set_result(p) + + custom_room.on("participant_connected", on_participant) + await fut + + logger.info("Participants: %s", list(custom_room.remote_participants.keys())) + model = os.environ.get("LITELLM_MODEL", "claude-sonnet") voice_id = os.environ.get("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") @@ -39,15 +88,51 @@ async def entrypoint(ctx: JobContext): model=model, ), tts=elevenlabs.TTS( - voice=voice_id, - model="eleven_multilingual_v2", + voice_id=voice_id, + model="eleven_turbo_v2_5", ), vad=ctx.proc.userdata["vad"], ) + # Find the real user's identity for targeted audio input + real_identity = next( + (p.identity for p in custom_room.remote_participants.values() + if not p.identity.startswith("agent-")), + None, + ) + logger.info("Starting agent session, targeting participant: %s", real_identity) + agent = Agent(instructions=SYSTEM_PROMPT) - await session.start(agent=agent, room=ctx.room) + input_opts = room_io.RoomInputOptions( + participant_identity=real_identity, + ) + await session.start( + agent=agent, + room=custom_room, + room_input_options=input_opts, + ) + logger.info("Session started, generating greeting...") await session.generate_reply(instructions="Greet the user briefly.") + logger.info("Greeting generated.") + + # Wait for all real participants to leave, then disconnect cleanly + # so the LiveKit room gets deleted and auto-dispatch fires on next call + left_fut = asyncio.get_event_loop().create_future() + + def on_participant_left(p: rtc.RemoteParticipant): + # Check if any real (non-agent) participants remain + remaining = [ + pid for pid in custom_room.remote_participants + if not pid.startswith("agent-") + ] + if not remaining and not left_fut.done(): + left_fut.set_result(True) + + custom_room.on("participant_disconnected", on_participant_left) + await left_fut + logger.info("All participants left, disconnecting custom room...") + await custom_room.disconnect() + logger.info("Room disconnected, agent exiting.") if __name__ == "__main__": diff --git a/bot.py b/bot.py index bc43562..dd01766 100644 --- a/bot.py +++ b/bot.py @@ -10,6 +10,7 @@ from nio import ( InviteMemberEvent, MegolmEvent, SyncResponse, + UnknownEvent, KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, @@ -18,6 +19,9 @@ from nio import ( ) from livekit import api +BOT_DEVICE_ID = "AIBOT" +CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member" + logger = logging.getLogger("matrix-ai-bot") HOMESERVER = os.environ["MATRIX_HOMESERVER"] @@ -47,6 +51,7 @@ class Bot: ) self.lkapi = None self.dispatched_rooms = set() + self.active_calls = set() # rooms where we've sent call member event async def start(self): # Restore existing session or create new one @@ -80,6 +85,7 @@ class Bot: self.lkapi = api.LiveKitAPI(LK_URL, LK_KEY, LK_SECRET) self.client.add_event_callback(self.on_invite, InviteMemberEvent) self.client.add_event_callback(self.on_megolm, MegolmEvent) + self.client.add_event_callback(self.on_unknown, UnknownEvent) self.client.add_response_callback(self.on_sync, SyncResponse) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey) @@ -115,6 +121,69 @@ class Bot: self.client.verify_device(device) logger.info("Auto-trusted device %s of %s", device.device_id, user_id) + async def on_unknown(self, room, event: UnknownEvent): + """Handle call member state events to join calls.""" + if event.type != CALL_MEMBER_TYPE: + return + if event.sender == BOT_USER: + return # ignore our own events + + # Non-empty content means someone started/is in a call + if event.source.get("content", {}): + room_id = room.room_id + if room_id in self.active_calls: + return + + logger.info("Call detected in %s from %s, joining...", room_id, event.sender) + self.active_calls.add(room_id) + + # Get the foci_preferred from the caller's event + content = event.source["content"] + foci = content.get("foci_preferred", [{ + "type": "livekit", + "livekit_service_url": f"{HOMESERVER}/livekit-jwt-service", + "livekit_alias": room_id, + }]) + + # Send our own call member state event + call_content = { + "application": "m.call", + "call_id": "", + "scope": "m.room", + "device_id": BOT_DEVICE_ID, + "expires": 7200000, + "focus_active": { + "type": "livekit", + "focus_selection": "oldest_membership", + }, + "foci_preferred": foci, + "m.call.intent": "audio", + } + + state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call" + try: + resp = await self.client.room_put_state( + room_id, CALL_MEMBER_TYPE, call_content, state_key=state_key, + ) + logger.info("Sent call member event in %s: %s", room_id, resp) + except Exception: + logger.exception("Failed to send call member event in %s", room_id) + + else: + # Empty content = someone left the call, check if anyone is still calling + room_id = room.room_id + if room_id in self.active_calls: + # Leave the call too + self.active_calls.discard(room_id) + state_key = f"_{BOT_USER}_{BOT_DEVICE_ID}_m.call" + try: + await self.client.room_put_state( + room_id, CALL_MEMBER_TYPE, {}, state_key=state_key, + ) + logger.info("Left call in %s", room_id) + except Exception: + logger.exception("Failed to leave call in %s", room_id) + async def on_megolm(self, room, event: MegolmEvent): """Log undecryptable messages.""" logger.warning(