"""Activity video track — pulsing orb (lightweight). Small 160x120 canvas, only renders pixels near the orb. LiveKit/browser upscales. Minimal CPU on both server and client. """ import asyncio import math import random import logging import time import struct from livekit.rtc import VideoSource, VideoFrame, VideoBufferType logger = logging.getLogger("activity-video") WIDTH = 160 HEIGHT = 120 FPS = 15 BPP = 4 CX, CY = WIDTH // 2, HEIGHT // 2 BG = (12, 12, 28) STATE_COLORS = { "listening": (40, 120, 255), "thinking": (100, 60, 255), "speaking": (30, 200, 255), "initializing": (40, 60, 120), } _BG_PIXEL = struct.pack('BBBB', *BG, 255) _BG_FRAME = _BG_PIXEL * (WIDTH * HEIGHT) # Pre-compute distance from center — only within max possible glow radius MAX_ORB = 45 # max orb radius at full energy MAX_GLOW = int(MAX_ORB * 2.5) + 5 # Store sparse: list of (pixel_index, distance) for pixels within MAX_GLOW of center _PIXELS = [] for _y in range(max(0, CY - MAX_GLOW), min(HEIGHT, CY + MAX_GLOW + 1)): dy = _y - CY for _x in range(max(0, CX - MAX_GLOW), min(WIDTH, CX + MAX_GLOW + 1)): dx = _x - CX d = math.sqrt(dx * dx + dy * dy) if d <= MAX_GLOW: _PIXELS.append((_y * WIDTH + _x, d)) class ActivityVideoPublisher: def __init__(self): self.source = VideoSource(WIDTH, HEIGHT) self._state = "initializing" self._stopped = False self._pulse = 0.0 self._energy = 0.0 self._target_energy = 0.0 self._color = list(STATE_COLORS["initializing"]) self._target_color = list(STATE_COLORS["initializing"]) self._ring_phase = 0.0 def set_state(self, state: str): if self._state != state: logger.info("Activity video state: %s -> %s", self._state, state) self._state = state self._target_color = list(STATE_COLORS.get(state, STATE_COLORS["initializing"])) def stop(self): self._stopped = True def _update(self, t: float): state = self._state for i in range(3): self._color[i] += (self._target_color[i] - self._color[i]) * 0.08 if state == "listening": self._target_energy = 0.3 self._pulse = 0.5 * math.sin(t * 1.5) + 0.5 elif state == "thinking": self._target_energy = 0.6 self._pulse = 0.5 * math.sin(t * 3.0) + 0.5 elif state == "speaking": self._target_energy = 0.9 + random.uniform(-0.1, 0.1) self._pulse = 0.5 * math.sin(t * 6.0) + 0.5 + random.uniform(-0.15, 0.15) else: self._target_energy = 0.15 self._pulse = 0.3 self._energy += (self._target_energy - self._energy) * 0.12 self._ring_phase = t def _render_frame(self) -> bytearray: buf = bytearray(_BG_FRAME) r, g, b = self._color energy = self._energy pulse = self._pulse bg_r, bg_g, bg_b = BG base_radius = 15 + 8 * energy orb_radius = base_radius + 4 * pulse * energy glow_radius = orb_radius * 2.5 inv_orb = 1.0 / max(orb_radius, 1) glow_span = glow_radius - orb_radius inv_glow = 1.0 / max(glow_span, 1) ring_active = self._state == "speaking" if ring_active: ring1_r = orb_radius + ((self._ring_phase * 30) % glow_span) ring2_r = orb_radius + ((self._ring_phase * 30 + glow_span * 0.5) % glow_span) for idx, dist in _PIXELS: if dist > glow_radius: continue if dist <= orb_radius: f = dist * inv_orb brightness = 1.0 - 0.3 * f * f white = max(0.0, 1.0 - f * 2.5) * 0.6 * energy pr = min(255, int(r * brightness + 255 * white)) pg = min(255, int(g * brightness + 255 * white)) pb = min(255, int(b * brightness + 255 * white)) else: f = (dist - orb_radius) * inv_glow t3 = 1.0 - f glow = t3 * t3 * t3 * energy * 0.5 if ring_active: for rr in (ring1_r, ring2_r): rd = abs(dist - rr) if rd < 4: glow += (1.0 - rd * 0.25) * 0.3 * (1.0 - f) pr = min(255, int(bg_r + r * glow)) pg = min(255, int(bg_g + g * glow)) pb = min(255, int(bg_b + b * glow)) off = idx * BPP buf[off] = pr buf[off + 1] = pg buf[off + 2] = pb return buf async def run(self): logger.info("Activity video loop started (%dx%d @ %d FPS, orb mode, %d active pixels)", WIDTH, HEIGHT, FPS, len(_PIXELS)) interval = 1.0 / FPS t0 = time.monotonic() rgba_type = VideoBufferType.Value('RGBA') while not self._stopped: t = time.monotonic() - t0 self._update(t) buf = self._render_frame() frame = VideoFrame(WIDTH, HEIGHT, rgba_type, buf) self.source.capture_frame(frame) render_time = time.monotonic() - t0 - t await asyncio.sleep(max(0.001, interval - render_time)) logger.info("Activity video loop stopped")