feat: activity video track (pulsing orb) for voice sessions
- ActivityVideoPublisher renders animated orb on 160x120 canvas - Integrated into both agent.py and voice.py - Updates confluence-collab submodule
This commit is contained in:
161
activity_video.py
Normal file
161
activity_video.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Activity video track — pulsing orb (lightweight).
|
||||
|
||||
Small 160x120 canvas, only renders pixels near the orb.
|
||||
LiveKit/browser upscales. Minimal CPU on both server and client.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import math
|
||||
import random
|
||||
import logging
|
||||
import time
|
||||
import struct
|
||||
|
||||
from livekit.rtc import VideoSource, VideoFrame, VideoBufferType
|
||||
|
||||
logger = logging.getLogger("activity-video")
|
||||
|
||||
WIDTH = 160
|
||||
HEIGHT = 120
|
||||
FPS = 15
|
||||
BPP = 4
|
||||
CX, CY = WIDTH // 2, HEIGHT // 2
|
||||
|
||||
BG = (12, 12, 28)
|
||||
|
||||
STATE_COLORS = {
|
||||
"listening": (40, 120, 255),
|
||||
"thinking": (100, 60, 255),
|
||||
"speaking": (30, 200, 255),
|
||||
"initializing": (40, 60, 120),
|
||||
}
|
||||
|
||||
_BG_PIXEL = struct.pack('BBBB', *BG, 255)
|
||||
_BG_FRAME = _BG_PIXEL * (WIDTH * HEIGHT)
|
||||
|
||||
# Pre-compute distance from center — only within max possible glow radius
|
||||
MAX_ORB = 45 # max orb radius at full energy
|
||||
MAX_GLOW = int(MAX_ORB * 2.5) + 5
|
||||
# Store sparse: list of (pixel_index, distance) for pixels within MAX_GLOW of center
|
||||
_PIXELS = []
|
||||
for _y in range(max(0, CY - MAX_GLOW), min(HEIGHT, CY + MAX_GLOW + 1)):
|
||||
dy = _y - CY
|
||||
for _x in range(max(0, CX - MAX_GLOW), min(WIDTH, CX + MAX_GLOW + 1)):
|
||||
dx = _x - CX
|
||||
d = math.sqrt(dx * dx + dy * dy)
|
||||
if d <= MAX_GLOW:
|
||||
_PIXELS.append((_y * WIDTH + _x, d))
|
||||
|
||||
|
||||
class ActivityVideoPublisher:
|
||||
def __init__(self):
|
||||
self.source = VideoSource(WIDTH, HEIGHT)
|
||||
self._state = "initializing"
|
||||
self._stopped = False
|
||||
self._pulse = 0.0
|
||||
self._energy = 0.0
|
||||
self._target_energy = 0.0
|
||||
self._color = list(STATE_COLORS["initializing"])
|
||||
self._target_color = list(STATE_COLORS["initializing"])
|
||||
self._ring_phase = 0.0
|
||||
|
||||
def set_state(self, state: str):
|
||||
if self._state != state:
|
||||
logger.info("Activity video state: %s -> %s", self._state, state)
|
||||
self._state = state
|
||||
self._target_color = list(STATE_COLORS.get(state, STATE_COLORS["initializing"]))
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
|
||||
def _update(self, t: float):
|
||||
state = self._state
|
||||
for i in range(3):
|
||||
self._color[i] += (self._target_color[i] - self._color[i]) * 0.08
|
||||
|
||||
if state == "listening":
|
||||
self._target_energy = 0.3
|
||||
self._pulse = 0.5 * math.sin(t * 1.5) + 0.5
|
||||
elif state == "thinking":
|
||||
self._target_energy = 0.6
|
||||
self._pulse = 0.5 * math.sin(t * 3.0) + 0.5
|
||||
elif state == "speaking":
|
||||
self._target_energy = 0.9 + random.uniform(-0.1, 0.1)
|
||||
self._pulse = 0.5 * math.sin(t * 6.0) + 0.5 + random.uniform(-0.15, 0.15)
|
||||
else:
|
||||
self._target_energy = 0.15
|
||||
self._pulse = 0.3
|
||||
|
||||
self._energy += (self._target_energy - self._energy) * 0.12
|
||||
self._ring_phase = t
|
||||
|
||||
def _render_frame(self) -> bytearray:
|
||||
buf = bytearray(_BG_FRAME)
|
||||
|
||||
r, g, b = self._color
|
||||
energy = self._energy
|
||||
pulse = self._pulse
|
||||
bg_r, bg_g, bg_b = BG
|
||||
|
||||
base_radius = 15 + 8 * energy
|
||||
orb_radius = base_radius + 4 * pulse * energy
|
||||
glow_radius = orb_radius * 2.5
|
||||
inv_orb = 1.0 / max(orb_radius, 1)
|
||||
glow_span = glow_radius - orb_radius
|
||||
inv_glow = 1.0 / max(glow_span, 1)
|
||||
|
||||
ring_active = self._state == "speaking"
|
||||
if ring_active:
|
||||
ring1_r = orb_radius + ((self._ring_phase * 30) % glow_span)
|
||||
ring2_r = orb_radius + ((self._ring_phase * 30 + glow_span * 0.5) % glow_span)
|
||||
|
||||
for idx, dist in _PIXELS:
|
||||
if dist > glow_radius:
|
||||
continue
|
||||
|
||||
if dist <= orb_radius:
|
||||
f = dist * inv_orb
|
||||
brightness = 1.0 - 0.3 * f * f
|
||||
white = max(0.0, 1.0 - f * 2.5) * 0.6 * energy
|
||||
pr = min(255, int(r * brightness + 255 * white))
|
||||
pg = min(255, int(g * brightness + 255 * white))
|
||||
pb = min(255, int(b * brightness + 255 * white))
|
||||
else:
|
||||
f = (dist - orb_radius) * inv_glow
|
||||
t3 = 1.0 - f
|
||||
glow = t3 * t3 * t3 * energy * 0.5
|
||||
|
||||
if ring_active:
|
||||
for rr in (ring1_r, ring2_r):
|
||||
rd = abs(dist - rr)
|
||||
if rd < 4:
|
||||
glow += (1.0 - rd * 0.25) * 0.3 * (1.0 - f)
|
||||
|
||||
pr = min(255, int(bg_r + r * glow))
|
||||
pg = min(255, int(bg_g + g * glow))
|
||||
pb = min(255, int(bg_b + b * glow))
|
||||
|
||||
off = idx * BPP
|
||||
buf[off] = pr
|
||||
buf[off + 1] = pg
|
||||
buf[off + 2] = pb
|
||||
|
||||
return buf
|
||||
|
||||
async def run(self):
|
||||
logger.info("Activity video loop started (%dx%d @ %d FPS, orb mode, %d active pixels)",
|
||||
WIDTH, HEIGHT, FPS, len(_PIXELS))
|
||||
interval = 1.0 / FPS
|
||||
t0 = time.monotonic()
|
||||
rgba_type = VideoBufferType.Value('RGBA')
|
||||
|
||||
while not self._stopped:
|
||||
t = time.monotonic() - t0
|
||||
self._update(t)
|
||||
buf = self._render_frame()
|
||||
frame = VideoFrame(WIDTH, HEIGHT, rgba_type, buf)
|
||||
self.source.capture_frame(frame)
|
||||
render_time = time.monotonic() - t0 - t
|
||||
await asyncio.sleep(max(0.001, interval - render_time))
|
||||
|
||||
logger.info("Activity video loop stopped")
|
||||
Reference in New Issue
Block a user