Files
matrix-ai-agent/article_summary/__init__.py
Christian Gick 964a3f6075 feat: scheduled reminders + less aggressive article summary
Add scheduled messages/reminders system:
- New scheduled_messages table in memory-service with CRUD endpoints
- schedule_message, list_reminders, cancel_reminder tools for the bot
- Background scheduler loop (30s) sends due reminders automatically
- Supports one-time, daily, weekly, weekdays, monthly repeat patterns

Make article URL handling non-blocking:
- Show 3 options (discuss, text summary, audio) instead of forcing audio wizard
- Default to passing article context to AI if user just keeps chatting
- New AWAITING_LANGUAGE state for cleaner audio flow FSM

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 08:32:40 +02:00

412 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Blinkist-style article audio summary handler for Matrix bot."""
from __future__ import annotations
import logging
import re
from typing import TYPE_CHECKING
from openai import AsyncOpenAI
from .state import ArticleState, SessionManager
from .extractor import extract_article, detect_topics, is_article_url
from .summarizer import summarize_article
from .tts import generate_audio
if TYPE_CHECKING:
pass # Bot type would cause circular import
logger = logging.getLogger("article-summary")
# URL regex — matches http/https URLs in message text
URL_PATTERN = re.compile(r'https?://[^\s\)>\]"]+')
CANCEL_WORDS = {"cancel", "stop", "abbrechen", "abbruch", "nevermind"}
LANGUAGE_OPTIONS = {
"1": ("en", "English"),
"2": ("de", "German"),
"en": ("en", "English"),
"de": ("de", "German"),
"english": ("en", "English"),
"german": ("de", "German"),
"deutsch": ("de", "German"),
}
DURATION_OPTIONS = {
"1": 5,
"2": 10,
"3": 15,
"5": 5,
"10": 10,
"15": 15,
}
class ArticleSummaryHandler:
"""Handles the interactive article summary conversation flow."""
def __init__(
self,
llm_client: AsyncOpenAI,
model: str,
elevenlabs_key: str,
voice_id: str,
firecrawl_url: str | None = None,
) -> None:
self.llm = llm_client
self.model = model
self.elevenlabs_key = elevenlabs_key
self.voice_id = voice_id
self.firecrawl_url = firecrawl_url
self.sessions = SessionManager()
async def handle_message(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Process a message through the article summary FSM.
Returns:
- None: Not handled (pass to normal AI handler).
- str: Text response to send.
- "__GENERATE__": Signal to run the full generation pipeline.
"""
body_lower = body.strip().lower()
session = self.sessions.get(sender, room_id)
# Cancel from any active state
if session.state != ArticleState.IDLE and body_lower in CANCEL_WORDS:
self.sessions.reset(sender, room_id)
return "Summary cancelled."
# Route based on current state
if session.state == ArticleState.IDLE:
return await self._check_for_url(room_id, sender, body)
elif session.state == ArticleState.URL_DETECTED:
# Waiting for user to pick action (discuss, text summary, audio)
return await self._on_action_choice(room_id, sender, body, body_lower)
elif session.state == ArticleState.AWAITING_LANGUAGE:
# Audio flow: waiting for language selection
return self._on_language(room_id, sender, body_lower)
elif session.state == ArticleState.LANGUAGE:
# Waiting for duration selection
return self._on_duration(room_id, sender, body_lower)
elif session.state == ArticleState.DURATION:
# Waiting for topic selection
return self._on_topics(room_id, sender, body)
elif session.state == ArticleState.GENERATING:
return "Still generating your summary, please wait..."
elif session.state == ArticleState.COMPLETE:
# Follow-up Q&A about the article
return await self._on_followup(room_id, sender, body)
return None
async def _check_for_url(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Check if message contains an article URL."""
urls = URL_PATTERN.findall(body)
# Filter to article-like URLs
article_urls = [u for u in urls if is_article_url(u)]
if not article_urls:
return None
url = article_urls[0]
session = self.sessions.get(sender, room_id)
# Extract article content
logger.info("Extracting article from %s", url)
article = await extract_article(url, self.firecrawl_url)
if not article:
return None # Could not extract — let normal handler deal with it
session.url = url
session.title = article["title"]
session.content = article["content"]
word_count = article["word_count"]
read_time = max(1, word_count // 200)
# Detect topics via LLM
session.detected_topics = await detect_topics(
article["content"], self.llm, self.model
)
session.state = ArticleState.URL_DETECTED
self.sessions.touch(sender, room_id)
topics_hint = ""
if session.detected_topics:
topics_hint = f"\nTopics: {', '.join(session.detected_topics)}"
return (
f"**Found:** {session.title} (~{read_time} min read){topics_hint}\n\n"
f"What would you like to do?\n"
f"1\ufe0f\u20e3 **Discuss** \u2014 I'll read the article and we can talk about it\n"
f"2\ufe0f\u20e3 **Text summary** \u2014 Quick written summary\n"
f"3\ufe0f\u20e3 **Audio summary** \u2014 Blinkist-style MP3\n\n"
f"_(or just keep chatting \u2014 I won't interrupt)_"
)
def _on_language(
self, room_id: str, sender: str, choice: str
) -> str | None:
"""Handle language selection."""
lang = LANGUAGE_OPTIONS.get(choice)
if not lang:
return "Please pick a language: **1** for English, **2** for German."
session = self.sessions.get(sender, room_id)
session.language = lang[0]
session.state = ArticleState.LANGUAGE
self.sessions.touch(sender, room_id)
return (
f"Language: **{lang[1]}**. How long should the summary be?\n"
f"1⃣ 5 min (short)\n"
f"2⃣ 10 min (standard)\n"
f"3⃣ 15 min (detailed)"
)
def _on_duration(
self, room_id: str, sender: str, choice: str
) -> str | None:
"""Handle duration selection."""
duration = DURATION_OPTIONS.get(choice)
if not duration:
return "Please pick: **1** (5 min), **2** (10 min), or **3** (15 min)."
session = self.sessions.get(sender, room_id)
session.duration_minutes = duration
session.state = ArticleState.DURATION
self.sessions.touch(sender, room_id)
if session.detected_topics:
topic_list = "\n".join(
f"{t}" for t in session.detected_topics
)
return (
f"Duration: **{duration} min**. Focus on which topics?\n"
f"{topic_list}\n\n"
f"Reply with topic numbers (comma-separated), specific topics, or **all**."
)
else:
return (
f"Duration: **{duration} min**. Any specific topics to focus on?\n"
f"Reply with topics (comma-separated) or **all** for a general summary."
)
def _on_topics(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Handle topic selection. Returns __GENERATE__ to trigger pipeline."""
session = self.sessions.get(sender, room_id)
body_lower = body.strip().lower()
if body_lower in ("all", "alle", "everything", "alles"):
session.topics = session.detected_topics or []
else:
# Try to match by number
parts = re.split(r'[,\s]+', body.strip())
selected = []
for p in parts:
p = p.strip()
if p.isdigit():
idx = int(p) - 1
if 0 <= idx < len(session.detected_topics):
selected.append(session.detected_topics[idx])
elif p:
selected.append(p)
session.topics = selected or session.detected_topics or []
session.state = ArticleState.GENERATING
self.sessions.touch(sender, room_id)
return "__GENERATE__"
async def _on_action_choice(
self, room_id: str, sender: str, body: str, body_lower: str
) -> str | None:
"""Handle user's choice after URL detection: discuss, text summary, or audio."""
session = self.sessions.get(sender, room_id)
# Option 1: Discuss — reset FSM, return article context for AI handler
if body_lower in ("1", "discuss", "diskutieren", "besprechen"):
article_context = session.content[:8000]
title = session.title
self.sessions.reset(sender, room_id)
return f"__DISCUSS__{title}\n{article_context}"
# Option 2: Text summary — generate and return text, no TTS
if body_lower in ("2", "text", "text summary", "zusammenfassung"):
return await self._generate_text_summary(room_id, sender)
# Option 3: Audio summary — enter language selection (existing flow)
if body_lower in ("3", "audio", "audio summary"):
return self._prompt_language(room_id, sender)
# Anything else — user is just chatting, reset and pass through with article context
article_context = session.content[:8000]
title = session.title
self.sessions.reset(sender, room_id)
return f"__DISCUSS__{title}\n{article_context}"
def _prompt_language(self, room_id: str, sender: str) -> str:
"""Present language selection for audio summary."""
session = self.sessions.get(sender, room_id)
session.state = ArticleState.AWAITING_LANGUAGE
self.sessions.touch(sender, room_id)
return (
"What language for the audio summary?\n"
"1\ufe0f\u20e3 English\n"
"2\ufe0f\u20e3 German"
)
async def _generate_text_summary(self, room_id: str, sender: str) -> str | None:
"""Generate a text-only summary of the article."""
session = self.sessions.get(sender, room_id)
try:
resp = await self.llm.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": (
"Summarize this article concisely in 3-5 paragraphs. "
"Respond in the same language as the article."
),
},
{
"role": "user",
"content": f"Article: {session.title}\n\n{session.content[:12000]}",
},
],
max_tokens=1000,
temperature=0.3,
)
summary = resp.choices[0].message.content.strip()
session.summary_text = summary
session.state = ArticleState.COMPLETE
self.sessions.touch(sender, room_id)
return (
f"**Summary: {session.title}**\n\n{summary}\n\n"
f"_Ask follow-up questions or share a new link._"
)
except Exception:
logger.warning("Text summary failed", exc_info=True)
self.sessions.reset(sender, room_id)
return None
async def generate_and_post(self, bot, room_id: str, sender: str) -> None:
"""Run the full pipeline: summarize → TTS → upload MP3."""
session = self.sessions.get(sender, room_id)
topics_str = ", ".join(session.topics) if session.topics else "all topics"
await bot._send_text(
room_id,
f"Generating {session.duration_minutes}-min {session.language.upper()} "
f"summary of **{session.title}** (focus: {topics_str})...",
)
try:
# Step 1: Summarize
summary = await summarize_article(
content=session.content,
language=session.language,
duration_minutes=session.duration_minutes,
topics=session.topics,
llm_client=self.llm,
model=self.model,
)
session.summary_text = summary
# Step 2: TTS
mp3_bytes, duration_secs = await generate_audio(
text=summary,
api_key=self.elevenlabs_key,
voice_id=self.voice_id,
language=session.language,
)
# Step 3: Upload and send audio
filename = re.sub(r'[^\w\s-]', '', session.title)[:50].strip()
filename = f"{filename}.mp3" if filename else "summary.mp3"
await bot._send_audio(room_id, mp3_bytes, filename, duration_secs)
# Step 4: Send transcript
transcript_preview = summary[:500]
if len(summary) > 500:
transcript_preview += "..."
await bot._send_text(
room_id,
f"**Summary of:** {session.title}\n\n{transcript_preview}\n\n"
f"_You can ask follow-up questions about this article._",
)
session.state = ArticleState.COMPLETE
self.sessions.touch(sender, room_id)
except Exception:
logger.exception("Article summary pipeline failed for %s", session.url)
await bot._send_text(
room_id, "Sorry, I couldn't generate the audio summary. Please try again."
)
self.sessions.reset(sender, room_id)
async def _on_followup(
self, room_id: str, sender: str, body: str
) -> str | None:
"""Answer follow-up questions about the summarized article."""
session = self.sessions.get(sender, room_id)
# If user posts a new URL, start fresh
urls = URL_PATTERN.findall(body)
if any(is_article_url(u) for u in urls):
self.sessions.reset(sender, room_id)
return await self._check_for_url(room_id, sender, body)
# Check if it looks like a question about the article
question_indicators = ["?", "what", "how", "why", "explain", "was", "wie", "warum", "erkläre"]
is_question = any(q in body.lower() for q in question_indicators)
if not is_question:
# Not a question — reset and let normal handler take over
self.sessions.reset(sender, room_id)
return None
try:
resp = await self.llm.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": (
"You are answering follow-up questions about an article. "
"Use the article content below to answer. Be concise. "
"Respond in the same language as the question."
),
},
{
"role": "user",
"content": (
f"Article: {session.title}\n\n"
f"{session.content[:8000]}\n\n"
f"Summary: {session.summary_text[:3000]}\n\n"
f"Question: {body}"
),
},
],
max_tokens=500,
temperature=0.5,
)
return resp.choices[0].message.content.strip()
except Exception:
logger.warning("Follow-up Q&A failed", exc_info=True)
self.sessions.reset(sender, room_id)
return None