feat: Blinkist-style audio summary bot (MAT-74)

Add interactive article summary feature: user pastes URL → bot asks
language/duration/topics → generates audio summary via LLM + ElevenLabs
TTS → posts MP3 inline with transcript and follow-up Q&A.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-03-04 17:39:09 +02:00
parent 1000891a97
commit 4ec4054db4
6 changed files with 789 additions and 0 deletions

333
article_summary/__init__.py Normal file
View File

@@ -0,0 +1,333 @@
"""Blinkist-style article audio summary handler for Matrix bot."""
from __future__ import annotations
import logging
import re
from typing import TYPE_CHECKING
from openai import AsyncOpenAI
from .state import ArticleState, SessionManager
from .extractor import extract_article, detect_topics, is_article_url
from .summarizer import summarize_article
from .tts import generate_audio
if TYPE_CHECKING:
pass # Bot type would cause circular import
logger = logging.getLogger("article-summary")
# URL regex — matches http/https URLs in message text
URL_PATTERN = re.compile(r'https?://[^\s\)>\]"]+')
CANCEL_WORDS = {"cancel", "stop", "abbrechen", "abbruch", "nevermind"}
LANGUAGE_OPTIONS = {
"1": ("en", "English"),
"2": ("de", "German"),
"en": ("en", "English"),
"de": ("de", "German"),
"english": ("en", "English"),
"german": ("de", "German"),
"deutsch": ("de", "German"),
}
DURATION_OPTIONS = {
"1": 5,
"2": 10,
"3": 15,
"5": 5,
"10": 10,
"15": 15,
}
class ArticleSummaryHandler:
"""Handles the interactive article summary conversation flow."""
def __init__(
self,
llm_client: AsyncOpenAI,
model: str,
elevenlabs_key: str,
voice_id: str,
firecrawl_url: str | None = None,
) -> None:
self.llm = llm_client
self.model = model
self.elevenlabs_key = elevenlabs_key
self.voice_id = voice_id
self.firecrawl_url = firecrawl_url
self.sessions = SessionManager()
async def handle_message(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Process a message through the article summary FSM.
Returns:
- None: Not handled (pass to normal AI handler).
- str: Text response to send.
- "__GENERATE__": Signal to run the full generation pipeline.
"""
body_lower = body.strip().lower()
session = self.sessions.get(sender, room_id)
# Cancel from any active state
if session.state != ArticleState.IDLE and body_lower in CANCEL_WORDS:
self.sessions.reset(sender, room_id)
return "Summary cancelled."
# Route based on current state
if session.state == ArticleState.IDLE:
return await self._check_for_url(room_id, sender, body)
elif session.state == ArticleState.URL_DETECTED:
# Waiting for language selection
return self._on_language(room_id, sender, body_lower)
elif session.state == ArticleState.LANGUAGE:
# Waiting for duration selection
return self._on_duration(room_id, sender, body_lower)
elif session.state == ArticleState.DURATION:
# Waiting for topic selection
return self._on_topics(room_id, sender, body)
elif session.state == ArticleState.GENERATING:
return "Still generating your summary, please wait..."
elif session.state == ArticleState.COMPLETE:
# Follow-up Q&A about the article
return await self._on_followup(room_id, sender, body)
return None
async def _check_for_url(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Check if message contains an article URL."""
urls = URL_PATTERN.findall(body)
# Filter to article-like URLs
article_urls = [u for u in urls if is_article_url(u)]
if not article_urls:
return None
url = article_urls[0]
session = self.sessions.get(sender, room_id)
# Extract article content
logger.info("Extracting article from %s", url)
article = await extract_article(url, self.firecrawl_url)
if not article:
return None # Could not extract — let normal handler deal with it
session.url = url
session.title = article["title"]
session.content = article["content"]
word_count = article["word_count"]
read_time = max(1, word_count // 200)
# Detect topics via LLM
session.detected_topics = await detect_topics(
article["content"], self.llm, self.model
)
session.state = ArticleState.URL_DETECTED
self.sessions.touch(sender, room_id)
topics_hint = ""
if session.detected_topics:
topics_hint = f"\nTopics: {', '.join(session.detected_topics)}"
return (
f"**Found:** {session.title} (~{read_time} min read){topics_hint}\n\n"
f"Want an audio summary? What language?\n"
f"1⃣ English\n"
f"2⃣ German\n\n"
f"_(or say \"cancel\" to skip)_"
)
def _on_language(
self, room_id: str, sender: str, choice: str
) -> str | None:
"""Handle language selection."""
lang = LANGUAGE_OPTIONS.get(choice)
if not lang:
return "Please pick a language: **1** for English, **2** for German."
session = self.sessions.get(sender, room_id)
session.language = lang[0]
session.state = ArticleState.LANGUAGE
self.sessions.touch(sender, room_id)
return (
f"Language: **{lang[1]}**. How long should the summary be?\n"
f"1⃣ 5 min (short)\n"
f"2⃣ 10 min (standard)\n"
f"3⃣ 15 min (detailed)"
)
def _on_duration(
self, room_id: str, sender: str, choice: str
) -> str | None:
"""Handle duration selection."""
duration = DURATION_OPTIONS.get(choice)
if not duration:
return "Please pick: **1** (5 min), **2** (10 min), or **3** (15 min)."
session = self.sessions.get(sender, room_id)
session.duration_minutes = duration
session.state = ArticleState.DURATION
self.sessions.touch(sender, room_id)
if session.detected_topics:
topic_list = "\n".join(
f"{t}" for t in session.detected_topics
)
return (
f"Duration: **{duration} min**. Focus on which topics?\n"
f"{topic_list}\n\n"
f"Reply with topic numbers (comma-separated), specific topics, or **all**."
)
else:
return (
f"Duration: **{duration} min**. Any specific topics to focus on?\n"
f"Reply with topics (comma-separated) or **all** for a general summary."
)
def _on_topics(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Handle topic selection. Returns __GENERATE__ to trigger pipeline."""
session = self.sessions.get(sender, room_id)
body_lower = body.strip().lower()
if body_lower in ("all", "alle", "everything", "alles"):
session.topics = session.detected_topics or []
else:
# Try to match by number
parts = re.split(r'[,\s]+', body.strip())
selected = []
for p in parts:
p = p.strip()
if p.isdigit():
idx = int(p) - 1
if 0 <= idx < len(session.detected_topics):
selected.append(session.detected_topics[idx])
elif p:
selected.append(p)
session.topics = selected or session.detected_topics or []
session.state = ArticleState.GENERATING
self.sessions.touch(sender, room_id)
return "__GENERATE__"
async def generate_and_post(self, bot, room_id: str, sender: str) -> None:
"""Run the full pipeline: summarize → TTS → upload MP3."""
session = self.sessions.get(sender, room_id)
topics_str = ", ".join(session.topics) if session.topics else "all topics"
await bot._send_text(
room_id,
f"Generating {session.duration_minutes}-min {session.language.upper()} "
f"summary of **{session.title}** (focus: {topics_str})...",
)
try:
# Step 1: Summarize
summary = await summarize_article(
content=session.content,
language=session.language,
duration_minutes=session.duration_minutes,
topics=session.topics,
llm_client=self.llm,
model=self.model,
)
session.summary_text = summary
# Step 2: TTS
mp3_bytes, duration_secs = await generate_audio(
text=summary,
api_key=self.elevenlabs_key,
voice_id=self.voice_id,
language=session.language,
)
# Step 3: Upload and send audio
filename = re.sub(r'[^\w\s-]', '', session.title)[:50].strip()
filename = f"{filename}.mp3" if filename else "summary.mp3"
await bot._send_audio(room_id, mp3_bytes, filename, duration_secs)
# Step 4: Send transcript
transcript_preview = summary[:500]
if len(summary) > 500:
transcript_preview += "..."
await bot._send_text(
room_id,
f"**Summary of:** {session.title}\n\n{transcript_preview}\n\n"
f"_You can ask follow-up questions about this article._",
)
session.state = ArticleState.COMPLETE
self.sessions.touch(sender, room_id)
except Exception:
logger.exception("Article summary pipeline failed for %s", session.url)
await bot._send_text(
room_id, "Sorry, I couldn't generate the audio summary. Please try again."
)
self.sessions.reset(sender, room_id)
async def _on_followup(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Answer follow-up questions about the summarized article."""
session = self.sessions.get(sender, room_id)
# If user posts a new URL, start fresh
urls = URL_PATTERN.findall(body)
if any(is_article_url(u) for u in urls):
self.sessions.reset(sender, room_id)
return await self._check_for_url(room_id, sender, body)
# Check if it looks like a question about the article
question_indicators = ["?", "what", "how", "why", "explain", "was", "wie", "warum", "erkläre"]
is_question = any(q in body.lower() for q in question_indicators)
if not is_question:
# Not a question — reset and let normal handler take over
self.sessions.reset(sender, room_id)
return None
try:
resp = await self.llm.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": (
"You are answering follow-up questions about an article. "
"Use the article content below to answer. Be concise. "
"Respond in the same language as the question."
),
},
{
"role": "user",
"content": (
f"Article: {session.title}\n\n"
f"{session.content[:8000]}\n\n"
f"Summary: {session.summary_text[:3000]}\n\n"
f"Question: {body}"
),
},
],
max_tokens=500,
temperature=0.5,
)
return resp.choices[0].message.content.strip()
except Exception:
logger.warning("Follow-up Q&A failed", exc_info=True)
self.sessions.reset(sender, room_id)
return None

View File

@@ -0,0 +1,146 @@
"""Article content extraction via Firecrawl with BeautifulSoup fallback."""
from __future__ import annotations
import logging
import re
import httpx
from bs4 import BeautifulSoup
logger = logging.getLogger("article-summary.extractor")
MAX_CONTENT_CHARS = 15_000
# Domains that are not articles (social media, file hosts, etc.)
NON_ARTICLE_DOMAINS = {
"youtube.com", "youtu.be", "twitter.com", "x.com", "instagram.com",
"facebook.com", "tiktok.com", "reddit.com", "discord.com",
"drive.google.com", "docs.google.com", "github.com",
}
def is_article_url(url: str) -> bool:
"""Check if URL is likely an article (not social media, files, etc.)."""
try:
from urllib.parse import urlparse
host = urlparse(url).hostname or ""
host = host.removeprefix("www.")
return host not in NON_ARTICLE_DOMAINS
except Exception:
return False
async def extract_article(url: str, firecrawl_url: str | None = None) -> dict | None:
"""Extract article content from URL.
Returns dict with: title, content, word_count, detected_topics, language_hint
Returns None if extraction fails.
"""
title = ""
content = ""
# Try Firecrawl first
if firecrawl_url:
try:
result = await _firecrawl_extract(url, firecrawl_url)
if result:
title, content = result
except Exception:
logger.warning("Firecrawl extraction failed for %s", url, exc_info=True)
# Fallback to BeautifulSoup
if not content:
try:
result = await _bs4_extract(url)
if result:
title, content = result
except Exception:
logger.warning("BS4 extraction failed for %s", url, exc_info=True)
if not content:
return None
content = content[:MAX_CONTENT_CHARS]
word_count = len(content.split())
return {
"title": title or url,
"content": content,
"word_count": word_count,
}
async def _firecrawl_extract(url: str, firecrawl_url: str) -> tuple[str, str] | None:
"""Extract via Firecrawl API."""
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{firecrawl_url}/v1/scrape",
json={"url": url, "formats": ["markdown"]},
)
resp.raise_for_status()
data = resp.json()
doc = data.get("data", {})
title = doc.get("metadata", {}).get("title", "")
content = doc.get("markdown", "")
if not content:
return None
return title, content
async def _bs4_extract(url: str) -> tuple[str, str] | None:
"""Fallback extraction via httpx + BeautifulSoup."""
headers = {
"User-Agent": "Mozilla/5.0 (compatible; ArticleSummaryBot/1.0)",
"Accept": "text/html",
}
async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# Extract title
title = ""
if soup.title:
title = soup.title.get_text(strip=True)
# Remove script/style/nav elements
for tag in soup(["script", "style", "nav", "header", "footer", "aside", "form"]):
tag.decompose()
# Try <article> tag first, then <main>, then body
article = soup.find("article") or soup.find("main") or soup.find("body")
if not article:
return None
# Get text, clean up whitespace
text = article.get_text(separator="\n", strip=True)
text = re.sub(r"\n{3,}", "\n\n", text)
if len(text) < 100:
return None
return title, text
async def detect_topics(content: str, llm_client, model: str) -> list[str]:
"""Use LLM to detect 3-5 key topics from article content."""
snippet = content[:2000]
try:
resp = await llm_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "Extract 3-5 key topics from this article. Return ONLY a comma-separated list of short topic labels (2-4 words each). No numbering, no explanation."},
{"role": "user", "content": snippet},
],
max_tokens=100,
temperature=0.3,
)
raw = resp.choices[0].message.content.strip()
topics = [t.strip() for t in raw.split(",") if t.strip()]
return topics[:5]
except Exception:
logger.warning("Topic detection failed", exc_info=True)
return []

60
article_summary/state.py Normal file
View File

@@ -0,0 +1,60 @@
"""Per-user FSM state machine for article summary conversations."""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from enum import Enum, auto
class ArticleState(Enum):
IDLE = auto()
URL_DETECTED = auto()
LANGUAGE = auto()
DURATION = auto()
TOPICS = auto()
GENERATING = auto()
COMPLETE = auto()
@dataclass
class ArticleSession:
state: ArticleState = ArticleState.IDLE
url: str = ""
title: str = ""
content: str = ""
language: str = ""
duration_minutes: int = 10
topics: list[str] = field(default_factory=list)
detected_topics: list[str] = field(default_factory=list)
summary_text: str = ""
timestamp: float = field(default_factory=time.time)
STATE_TIMEOUT = 300 # 5 minutes
class SessionManager:
"""Manage per-(user, room) article summary sessions."""
def __init__(self) -> None:
self._sessions: dict[tuple[str, str], ArticleSession] = {}
def get(self, user_id: str, room_id: str) -> ArticleSession:
key = (user_id, room_id)
session = self._sessions.get(key)
if session and time.time() - session.timestamp > STATE_TIMEOUT:
session = None
self._sessions.pop(key, None)
if session is None:
session = ArticleSession()
self._sessions[key] = session
return session
def reset(self, user_id: str, room_id: str) -> None:
self._sessions.pop((user_id, room_id), None)
def touch(self, user_id: str, room_id: str) -> None:
key = (user_id, room_id)
if key in self._sessions:
self._sessions[key].timestamp = time.time()

View File

@@ -0,0 +1,68 @@
"""LLM-powered article summarization with personalization."""
from __future__ import annotations
import logging
from openai import AsyncOpenAI
logger = logging.getLogger("article-summary.summarizer")
WORDS_PER_MINUTE = 150 # Clear narration pace
async def summarize_article(
content: str,
language: str,
duration_minutes: int,
topics: list[str],
llm_client: AsyncOpenAI,
model: str,
) -> str:
"""Generate a narrative summary of article content.
Args:
content: Article text (max ~15K chars).
language: Target language ("en" or "de").
duration_minutes: Target audio duration (5, 10, or 15).
topics: Focus topics selected by user.
llm_client: AsyncOpenAI instance (LiteLLM).
model: Model name to use.
Returns:
Summary text ready for TTS.
"""
word_target = duration_minutes * WORDS_PER_MINUTE
lang_name = "German" if language == "de" else "English"
topics_str = ", ".join(topics) if topics else "all topics"
system_prompt = f"""You are a professional audio narrator creating a Blinkist-style summary.
RULES:
- Write in {lang_name}.
- Target approximately {word_target} words (for a {duration_minutes}-minute audio).
- Focus on: {topics_str}.
- Use a conversational, engaging narrator tone — as if explaining to a curious friend.
- Structure: brief intro → key insights → practical takeaways → brief conclusion.
- Use flowing prose, NOT bullet points or lists.
- Do NOT include any formatting markers, headers, or markdown.
- Do NOT say "In this article..." — jump straight into the content.
- Make it sound natural when read aloud."""
# Truncate very long content
if len(content) > 12_000:
content = content[:12_000] + "\n\n[Article continues...]"
resp = await llm_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Summarize this article:\n\n{content}"},
],
max_tokens=word_target * 2, # tokens ≈ 1.3x words, with headroom
temperature=0.7,
)
summary = resp.choices[0].message.content.strip()
logger.info("Generated summary: %d words (target: %d)", len(summary.split()), word_target)
return summary

108
article_summary/tts.py Normal file
View File

@@ -0,0 +1,108 @@
"""ElevenLabs TTS — direct API calls to generate MP3 audio."""
from __future__ import annotations
import io
import logging
import httpx
logger = logging.getLogger("article-summary.tts")
ELEVENLABS_API = "https://api.elevenlabs.io/v1"
CHUNK_SIZE = 5000 # Max chars per TTS request
async def generate_audio(
text: str,
api_key: str,
voice_id: str,
language: str = "en",
) -> tuple[bytes, float]:
"""Generate MP3 audio from text via ElevenLabs API.
Args:
text: Text to convert to speech.
api_key: ElevenLabs API key.
voice_id: ElevenLabs voice ID.
language: Language code ("en" or "de").
Returns:
Tuple of (mp3_bytes, estimated_duration_seconds).
"""
chunks = _split_text(text, CHUNK_SIZE)
mp3_parts: list[bytes] = []
for i, chunk in enumerate(chunks):
logger.info("Generating TTS chunk %d/%d (%d chars)", i + 1, len(chunks), len(chunk))
mp3_data = await _tts_request(chunk, api_key, voice_id, language)
mp3_parts.append(mp3_data)
combined = b"".join(mp3_parts)
# Estimate duration: ~150 words per minute
word_count = len(text.split())
est_duration = (word_count / 150) * 60
logger.info("TTS complete: %d bytes, ~%.0fs estimated", len(combined), est_duration)
return combined, est_duration
async def _tts_request(
text: str,
api_key: str,
voice_id: str,
language: str,
) -> bytes:
"""Single TTS API call."""
url = f"{ELEVENLABS_API}/text-to-speech/{voice_id}"
headers = {
"xi-api-key": api_key,
"Content-Type": "application/json",
"Accept": "audio/mpeg",
}
payload = {
"text": text,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75,
},
}
# Add language hint for non-English
if language == "de":
payload["language_code"] = "de"
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(url, json=payload, headers=headers)
resp.raise_for_status()
return resp.content
def _split_text(text: str, max_chars: int) -> list[str]:
"""Split text at sentence boundaries for TTS chunking."""
if len(text) <= max_chars:
return [text]
chunks: list[str] = []
current = ""
for sentence in _sentence_split(text):
if len(current) + len(sentence) > max_chars and current:
chunks.append(current.strip())
current = sentence
else:
current += sentence
if current.strip():
chunks.append(current.strip())
return chunks or [text[:max_chars]]
def _sentence_split(text: str) -> list[str]:
"""Split text into sentences, keeping delimiters attached."""
import re
parts = re.split(r'(?<=[.!?])\s+', text)
# Re-add trailing space for joining
return [p + " " for p in parts]

74
bot.py
View File

@@ -40,6 +40,7 @@ from nio import (
from nio.crypto.attachments import decrypt_attachment
from livekit import api
from voice import VoiceSession
from article_summary import ArticleSummaryHandler
BOT_DEVICE_ID = "AIBOT"
CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member"
@@ -77,6 +78,9 @@ BOT_API_KEY = os.environ.get("BOT_API_KEY", "")
RAG_ENDPOINT = os.environ.get("RAG_ENDPOINT", "") # Customer-VM RAG service (e.g. http://127.0.0.1:8765)
RAG_AUTH_TOKEN = os.environ.get("RAG_AUTH_TOKEN", "") # Bearer token for local RAG
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
ELEVENLABS_VOICE_ID = os.environ.get("ELEVENLABS_VOICE_ID", "ML23UVoFL5mI6APbRAeR")
FIRECRAWL_URL = os.environ.get("FIRECRAWL_URL", "")
MAX_TOOL_ITERATIONS = 5
SYSTEM_PROMPT = """You are a helpful AI assistant in a Matrix chat room.
@@ -962,6 +966,17 @@ class Bot:
self._sync_token_received = False
self._verifications: dict[str, dict] = {} # txn_id -> verification state
self._room_document_context: dict[str, list[dict]] = {} # room_id -> [{type, filename, text, timestamp}, ...]
# Article summary handler (Blinkist-style audio summaries)
if self.llm and ELEVENLABS_API_KEY:
self.article_handler = ArticleSummaryHandler(
llm_client=self.llm,
model=DEFAULT_MODEL,
elevenlabs_key=ELEVENLABS_API_KEY,
voice_id=ELEVENLABS_VOICE_ID,
firecrawl_url=FIRECRAWL_URL or None,
)
else:
self.article_handler = None
async def _has_documents(self, matrix_user_id: str) -> bool:
"""Check if user has documents via local RAG or MatrixHost portal API.
@@ -1530,6 +1545,24 @@ class Bot:
logger.info("Confluence page %s detected in room %s",
confluence_page_id, room.room_id)
# Check article summary FSM (Blinkist-style audio summaries)
if self.article_handler:
summary_response = await self.article_handler.handle_message(
room.room_id, sender, body
)
if summary_response is not None:
if summary_response == "__GENERATE__":
await self.client.room_typing(room.room_id, typing_state=True)
try:
await self.article_handler.generate_and_post(
self, room.room_id, sender
)
finally:
await self.client.room_typing(room.room_id, typing_state=False)
elif summary_response:
await self._send_text(room.room_id, summary_response)
return
await self.client.room_typing(room.room_id, typing_state=True)
try:
await self._respond_with_ai(room, body, sender=sender, image_data=image_data)
@@ -2331,6 +2364,47 @@ class Bot:
content=content,
)
async def _send_audio(self, room_id: str, audio_bytes: bytes, filename: str, duration_seconds: float):
"""Upload audio to Matrix homeserver and send as m.audio event."""
from nio import UploadResponse
upload_resp, maybe_keys = await self.client.upload(
data_provider=io.BytesIO(audio_bytes),
content_type="audio/mpeg",
filename=filename,
filesize=len(audio_bytes),
encrypt=True,
)
if not isinstance(upload_resp, UploadResponse):
logger.error("Audio upload failed: %s", upload_resp)
await self._send_text(room_id, "Sorry, I couldn't upload the audio file.")
return
content = {
"msgtype": "m.audio",
"body": filename,
"info": {
"mimetype": "audio/mpeg",
"size": len(audio_bytes),
"duration": int(duration_seconds * 1000), # Matrix uses milliseconds
},
}
if maybe_keys:
content["file"] = {
"url": upload_resp.content_uri,
"key": maybe_keys["key"],
"iv": maybe_keys["iv"],
"hashes": maybe_keys["hashes"],
"v": maybe_keys["v"],
}
else:
content["url"] = upload_resp.content_uri
await self.client.room_send(
room_id,
message_type="m.room.message",
content=content,
)
async def _summarize_call(self, transcript: list[dict], room_id: str) -> str:
"""Generate a concise summary of a voice call transcript via LLM."""
# Format transcript for the LLM