fix(vad): remove competing AudioStream that caused intermittent VAD failures

The _count_frames coroutine created a second rtc.AudioStream on the caller's
audio track, competing with AgentSession's internal pipeline for event loop
time. Under load, this caused VAD to miss speech → user_state stuck on "away".

- Remove _count_frames AudioStream (debugging artifact)
- Add VAD state diagnostics (speaking count, away duration)
- Add VAD watchdog: warns if user_state=away >30s (MAT-40 detection)

Fixes: MAT-40

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-02-22 19:02:39 +02:00
parent a8d4663f10
commit 6c1073e79d

View File

@@ -441,21 +441,9 @@ class VoiceSession:
@self.lk_room.on("track_subscribed") @self.lk_room.on("track_subscribed")
def on_ts(t, pub, p): def on_ts(t, pub, p):
logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted) logger.info("Track sub: %s %s kind=%s muted=%s", p.identity, pub.sid, t.kind, pub.muted)
# Start audio frame counter for this track to confirm data flows # NOTE: Do NOT create rtc.AudioStream here — it competes with AgentSession's
if int(t.kind) == 1: # internal audio pipeline for event loop time, causing intermittent VAD failures
async def _count_frames(track=t, pid=p.identity): # (user_state stuck on "away"). See MAT-40. Use e2ee_state_changed for flow confirmation.
try:
audio_stream = rtc.AudioStream(track)
count = 0
async for _ in audio_stream:
count += 1
if count == 1:
logger.info("AUDIO_FLOW: first frame from %s (E2EE OK)", pid)
elif count % 1000 == 0:
logger.info("AUDIO_FLOW: %d frames from %s", count, pid)
except Exception as exc:
logger.warning("AUDIO_FLOW monitor error: %s", exc)
asyncio.ensure_future(_count_frames())
# *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor # *** KEY FIX: set_key() with KDF_HKDF only applies HKDF when the frame cryptor
# for this participant already exists. The frame cryptor is created at track # for this participant already exists. The frame cryptor is created at track
# subscription time. Calling set_key() BEFORE track subscription (at connect) # subscription time. Calling set_key() BEFORE track subscription (at connect)
@@ -604,9 +592,23 @@ class VoiceSession:
) )
# Pipeline event logging (livekit-agents 1.4.2 event names) # Pipeline event logging (livekit-agents 1.4.2 event names)
_vad_state_log: dict = {"last": None, "speaking_count": 0, "away_since": None}
@self.session.on("user_state_changed") @self.session.on("user_state_changed")
def _on_user_state(ev): def _on_user_state(ev):
logger.info("VAD: user_state=%s", ev.new_state) state = ev.new_state
prev = _vad_state_log["last"]
_vad_state_log["last"] = state
if str(state) == "speaking":
_vad_state_log["speaking_count"] = _vad_state_log.get("speaking_count", 0) + 1
_vad_state_log["away_since"] = None
logger.info("VAD: user_state=%s (speaking_count=%d)", state, _vad_state_log["speaking_count"])
elif str(state) == "away" and prev and str(prev) != "away":
import time
_vad_state_log["away_since"] = time.monotonic()
logger.info("VAD: user_state=%s (was %s)", state, prev)
else:
logger.info("VAD: user_state=%s", state)
_last_user_speech: list[str] = [] _last_user_speech: list[str] = []
@@ -666,8 +668,24 @@ class VoiceSession:
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.error("Greeting timed out") logger.error("Greeting timed out")
# VAD watchdog: log diagnostic if user_state stays "away" for >30s
import time as _time
while True: while True:
await asyncio.sleep(5) await asyncio.sleep(10)
away_since = _vad_state_log.get("away_since")
if away_since and (_time.monotonic() - away_since) > 30:
sc = _vad_state_log.get("speaking_count", 0)
e2ee_ok = any(
str(getattr(p, '_e2ee_state', '')) == 'OK'
for p in self.lk_room.remote_participants.values()
) if self.lk_room else False
n_remote = len(self.lk_room.remote_participants) if self.lk_room else 0
logger.warning(
"VAD_WATCHDOG: user_state=away for >30s (speaking_count=%d, "
"remote_participants=%d, e2ee_ok=%s) — possible MAT-40 occurrence",
sc, n_remote, e2ee_ok,
)
_vad_state_log["away_since"] = None # only warn once per stuck period
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("Session cancelled for %s", self.room_id) logger.info("Session cancelled for %s", self.room_id)