fix: use standard ctx.connect() for audio pipeline (CF-1170)
Custom rtc.Room skipped ctx.connect(), leaving framework audio input pipeline uninitialized. STT/VAD never received audio frames. Switch to standard approach: ctx.connect() + ctx.room. Added debug event logging for speech pipeline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
95
agent.py
95
agent.py
@@ -1,21 +1,14 @@
|
|||||||
import os
|
import os
|
||||||
import asyncio
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from livekit import rtc
|
from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli
|
||||||
from livekit.api import AccessToken, VideoGrants
|
|
||||||
from livekit.agents import Agent, AgentSession, AgentServer, JobContext, JobProcess, cli, room_io
|
|
||||||
from livekit.plugins import openai as lk_openai, elevenlabs, silero
|
from livekit.plugins import openai as lk_openai, elevenlabs, silero
|
||||||
|
|
||||||
logger = logging.getLogger("matrix-ai-agent")
|
logger = logging.getLogger("matrix-ai-agent")
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
LITELLM_URL = os.environ["LITELLM_BASE_URL"]
|
LITELLM_URL = os.environ["LITELLM_BASE_URL"]
|
||||||
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
|
||||||
LK_API_KEY = os.environ["LIVEKIT_API_KEY"]
|
|
||||||
LK_API_SECRET = os.environ["LIVEKIT_API_SECRET"]
|
|
||||||
LK_URL = os.environ["LIVEKIT_URL"]
|
|
||||||
BOT_IDENTITY = os.environ.get("BOT_IDENTITY", "@ai:agiliton.eu:AIBOT")
|
|
||||||
|
|
||||||
SYSTEM_PROMPT = """You are a helpful voice assistant in a Matrix call.
|
SYSTEM_PROMPT = """You are a helpful voice assistant in a Matrix call.
|
||||||
Rules:
|
Rules:
|
||||||
@@ -36,46 +29,12 @@ server.setup_fnc = prewarm
|
|||||||
|
|
||||||
@server.rtc_session()
|
@server.rtc_session()
|
||||||
async def entrypoint(ctx: JobContext):
|
async def entrypoint(ctx: JobContext):
|
||||||
room_name = ctx.job.room.name
|
logger.info("Job received for room %s", ctx.job.room.name)
|
||||||
logger.info("Job received for room %s, connecting with Matrix identity...", room_name)
|
|
||||||
|
|
||||||
# Generate a token with the correct Matrix user identity
|
# Standard framework connection (handles audio pipeline properly)
|
||||||
token = (
|
await ctx.connect()
|
||||||
AccessToken(LK_API_KEY, LK_API_SECRET)
|
logger.info("Connected to room, local identity: %s", ctx.room.local_participant.identity)
|
||||||
.with_identity(BOT_IDENTITY)
|
logger.info("Remote participants: %s", list(ctx.room.remote_participants.keys()))
|
||||||
.with_grants(VideoGrants(
|
|
||||||
room=room_name,
|
|
||||||
room_join=True,
|
|
||||||
can_publish=True,
|
|
||||||
can_subscribe=True,
|
|
||||||
))
|
|
||||||
.to_jwt()
|
|
||||||
)
|
|
||||||
|
|
||||||
# Connect our own room with the Matrix identity (NOT ctx.connect())
|
|
||||||
custom_room = rtc.Room()
|
|
||||||
await custom_room.connect(LK_URL, token)
|
|
||||||
logger.info("Connected to room as %s", BOT_IDENTITY)
|
|
||||||
|
|
||||||
# Wait for a real (non-agent) participant
|
|
||||||
def has_real_participant():
|
|
||||||
return any(
|
|
||||||
not p.identity.startswith("agent-")
|
|
||||||
for p in custom_room.remote_participants.values()
|
|
||||||
)
|
|
||||||
|
|
||||||
if not has_real_participant():
|
|
||||||
logger.info("Waiting for real participant...")
|
|
||||||
fut = asyncio.get_event_loop().create_future()
|
|
||||||
|
|
||||||
def on_participant(p: rtc.RemoteParticipant):
|
|
||||||
if not p.identity.startswith("agent-") and not fut.done():
|
|
||||||
fut.set_result(p)
|
|
||||||
|
|
||||||
custom_room.on("participant_connected", on_participant)
|
|
||||||
await fut
|
|
||||||
|
|
||||||
logger.info("Participants: %s", list(custom_room.remote_participants.keys()))
|
|
||||||
|
|
||||||
model = os.environ.get("LITELLM_MODEL", "claude-sonnet")
|
model = os.environ.get("LITELLM_MODEL", "claude-sonnet")
|
||||||
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM")
|
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM")
|
||||||
@@ -94,46 +53,24 @@ async def entrypoint(ctx: JobContext):
|
|||||||
vad=ctx.proc.userdata["vad"],
|
vad=ctx.proc.userdata["vad"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Find the real user's identity for targeted audio input
|
# Debug: log pipeline events
|
||||||
real_identity = next(
|
@session.on("user_speech_committed")
|
||||||
(p.identity for p in custom_room.remote_participants.values()
|
def on_speech(msg):
|
||||||
if not p.identity.startswith("agent-")),
|
logger.info("USER_SPEECH_COMMITTED: %s", msg.text_content)
|
||||||
None,
|
|
||||||
)
|
@session.on("agent_speech_committed")
|
||||||
logger.info("Starting agent session, targeting participant: %s", real_identity)
|
def on_agent_speech(msg):
|
||||||
|
logger.info("AGENT_SPEECH_COMMITTED: %s", msg.text_content)
|
||||||
|
|
||||||
agent = Agent(instructions=SYSTEM_PROMPT)
|
agent = Agent(instructions=SYSTEM_PROMPT)
|
||||||
input_opts = room_io.RoomInputOptions(
|
|
||||||
participant_identity=real_identity,
|
|
||||||
)
|
|
||||||
await session.start(
|
await session.start(
|
||||||
agent=agent,
|
agent=agent,
|
||||||
room=custom_room,
|
room=ctx.room,
|
||||||
room_input_options=input_opts,
|
|
||||||
)
|
)
|
||||||
logger.info("Session started, generating greeting...")
|
logger.info("Session started, generating greeting...")
|
||||||
await session.generate_reply(instructions="Greet the user briefly.")
|
await session.generate_reply(instructions="Greet the user briefly.")
|
||||||
logger.info("Greeting generated.")
|
logger.info("Greeting generated.")
|
||||||
|
|
||||||
# Wait for all real participants to leave, then disconnect cleanly
|
|
||||||
# so the LiveKit room gets deleted and auto-dispatch fires on next call
|
|
||||||
left_fut = asyncio.get_event_loop().create_future()
|
|
||||||
|
|
||||||
def on_participant_left(p: rtc.RemoteParticipant):
|
|
||||||
# Check if any real (non-agent) participants remain
|
|
||||||
remaining = [
|
|
||||||
pid for pid in custom_room.remote_participants
|
|
||||||
if not pid.startswith("agent-")
|
|
||||||
]
|
|
||||||
if not remaining and not left_fut.done():
|
|
||||||
left_fut.set_result(True)
|
|
||||||
|
|
||||||
custom_room.on("participant_disconnected", on_participant_left)
|
|
||||||
await left_fut
|
|
||||||
logger.info("All participants left, disconnecting custom room...")
|
|
||||||
await custom_room.disconnect()
|
|
||||||
logger.info("Room disconnected, agent exiting.")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cli.run_app(server)
|
cli.run_app(server)
|
||||||
|
|||||||
Reference in New Issue
Block a user