"""Blinkist-style article audio summary handler for Matrix bot.""" from __future__ import annotations import logging import re from typing import TYPE_CHECKING from openai import AsyncOpenAI from .state import ArticleState, SessionManager from .extractor import extract_article, detect_topics, is_article_url from .summarizer import summarize_article from .tts import generate_audio if TYPE_CHECKING: pass # Bot type would cause circular import logger = logging.getLogger("article-summary") # URL regex — matches http/https URLs in message text URL_PATTERN = re.compile(r'https?://[^\s\)>\]"]+') CANCEL_WORDS = {"cancel", "stop", "abbrechen", "abbruch", "nevermind"} # Keyword sets for robust option matching (substring search, not exact match) _DISCUSS_KW = {"discuss", "diskutieren", "besprechen", "reden", "talk", "chat"} _TEXT_KW = {"text", "zusammenfassung", "summary", "lesen", "read", "schriftlich", "written"} _AUDIO_KW = {"audio", "mp3", "anhören", "vorlesen", "hören", "listen", "blinkist", "abspielen", "podcast"} # Simple German detection: common words that appear frequently in German text _DE_INDICATORS = {"der", "die", "das", "und", "ist", "ein", "eine", "für", "mit", "auf", "den", "dem", "sich", "nicht", "von", "wird", "auch", "nach", "wie", "aber"} LANGUAGE_OPTIONS = { "1": ("en", "English"), "2": ("de", "German"), "en": ("en", "English"), "de": ("de", "German"), "english": ("en", "English"), "german": ("de", "German"), "deutsch": ("de", "German"), } DURATION_OPTIONS = { "1": 5, "2": 10, "3": 15, "5": 5, "10": 10, "15": 15, } def _detect_content_lang(text: str) -> str: """Detect language from text content. Returns 'de' or 'en'.""" words = set(re.findall(r'\b\w+\b', text.lower())) de_hits = len(words & _DE_INDICATORS) return "de" if de_hits >= 4 else "en" def _classify_choice(body: str) -> str | None: """Classify user's action choice from free-form text. Returns 'discuss', 'text', 'audio', or None (unrecognized). """ # Normalize: lowercase, strip punctuation around digits raw = body.strip().lower() # Extract bare number if message is just "3." or "3!" or "nummer 3" etc. num_match = re.search(r'\b([123])\b', raw) bare_num = num_match.group(1) if num_match else None # Number-only messages (highest priority — unambiguous) stripped = re.sub(r'[^\w\s]', '', raw).strip() if stripped in ("1", "2", "3"): return {"1": "discuss", "2": "text", "3": "audio"}[stripped] # Keyword search (substring matching) if any(kw in raw for kw in _AUDIO_KW): return "audio" if any(kw in raw for kw in _TEXT_KW): return "text" if any(kw in raw for kw in _DISCUSS_KW): return "discuss" # "nummer 3" / "option 3" / "3. bitte" — number in context if bare_num: return {"1": "discuss", "2": "text", "3": "audio"}[bare_num] return None class ArticleSummaryHandler: """Handles the interactive article summary conversation flow.""" def __init__( self, llm_client: AsyncOpenAI, model: str, elevenlabs_key: str, voice_id: str, firecrawl_url: str | None = None, ) -> None: self.llm = llm_client self.model = model self.elevenlabs_key = elevenlabs_key self.voice_id = voice_id self.firecrawl_url = firecrawl_url self.sessions = SessionManager() async def handle_message( self, room_id: str, sender: str, body: str ) -> str | None: """Process a message through the article summary FSM. Returns: - None: Not handled (pass to normal AI handler). - str: Text response to send. - "__GENERATE__": Signal to run the full generation pipeline. """ body_lower = body.strip().lower() session = self.sessions.get(sender, room_id) # Cancel from any active state if session.state != ArticleState.IDLE and body_lower in CANCEL_WORDS: ui_de = session.ui_language == "de" self.sessions.reset(sender, room_id) return "Zusammenfassung abgebrochen." if ui_de else "Summary cancelled." # Route based on current state if session.state == ArticleState.IDLE: return await self._check_for_url(room_id, sender, body) elif session.state == ArticleState.URL_DETECTED: # Waiting for user to pick action (discuss, text summary, audio) return await self._on_action_choice(room_id, sender, body, body_lower) elif session.state == ArticleState.AWAITING_LANGUAGE: # Audio flow: waiting for language selection return self._on_language(room_id, sender, body_lower) elif session.state == ArticleState.LANGUAGE: # Waiting for duration selection return self._on_duration(room_id, sender, body_lower) elif session.state == ArticleState.DURATION: # Waiting for topic selection return self._on_topics(room_id, sender, body) elif session.state == ArticleState.GENERATING: if session.ui_language == "de": return "Zusammenfassung wird noch erstellt, bitte warten..." return "Still generating your summary, please wait..." elif session.state == ArticleState.COMPLETE: # Follow-up Q&A about the article return await self._on_followup(room_id, sender, body) return None async def _check_for_url( self, room_id: str, sender: str, body: str ) -> str | None: """Check if message contains an article URL.""" urls = URL_PATTERN.findall(body) # Filter to article-like URLs article_urls = [u for u in urls if is_article_url(u)] if not article_urls: return None url = article_urls[0] session = self.sessions.get(sender, room_id) # Extract article content logger.info("Extracting article from %s", url) article = await extract_article(url, self.firecrawl_url) if not article: return None # Could not extract — let normal handler deal with it session.url = url session.title = article["title"] session.content = article["content"] word_count = article["word_count"] read_time = max(1, word_count // 200) # Detect topics via LLM session.detected_topics = await detect_topics( article["content"], self.llm, self.model ) session.state = ArticleState.URL_DETECTED self.sessions.touch(sender, room_id) topics_hint = "" if session.detected_topics: topics_hint = f"\nTopics: {', '.join(session.detected_topics)}" # Detect article language for localized UI lang = _detect_content_lang(session.content[:2000]) session.ui_language = lang if lang == "de": return ( f"**Gefunden:** {session.title} (~{read_time} min Lesezeit){topics_hint}\n\n" f"Was möchtest du damit machen?\n" f"1\ufe0f\u20e3 **Diskutieren** \u2014 Ich lese den Artikel und wir reden darüber\n" f"2\ufe0f\u20e3 **Textzusammenfassung** \u2014 Kurze schriftliche Zusammenfassung\n" f"3\ufe0f\u20e3 **Audiozusammenfassung** \u2014 Blinkist-Style MP3\n\n" f"_(oder schreib einfach weiter \u2014 ich unterbreche nicht)_" ) return ( f"**Found:** {session.title} (~{read_time} min read){topics_hint}\n\n" f"What would you like to do?\n" f"1\ufe0f\u20e3 **Discuss** \u2014 I'll read the article and we can talk about it\n" f"2\ufe0f\u20e3 **Text summary** \u2014 Quick written summary\n" f"3\ufe0f\u20e3 **Audio summary** \u2014 Blinkist-style MP3\n\n" f"_(or just keep chatting \u2014 I won't interrupt)_" ) def _on_language( self, room_id: str, sender: str, choice: str ) -> str | None: """Handle language selection.""" lang = LANGUAGE_OPTIONS.get(choice) session = self.sessions.get(sender, room_id) ui_de = session.ui_language == "de" if not lang: if ui_de: return "Bitte wähle eine Sprache: **1** für Englisch, **2** für Deutsch." return "Please pick a language: **1** for English, **2** for German." session.language = lang[0] session.state = ArticleState.LANGUAGE self.sessions.touch(sender, room_id) if ui_de: return ( f"Sprache: **{lang[1]}**. Wie lang soll die Zusammenfassung sein?\n" f"1️⃣ 5 Min (kurz)\n" f"2️⃣ 10 Min (standard)\n" f"3️⃣ 15 Min (ausführlich)" ) return ( f"Language: **{lang[1]}**. How long should the summary be?\n" f"1️⃣ 5 min (short)\n" f"2️⃣ 10 min (standard)\n" f"3️⃣ 15 min (detailed)" ) def _on_duration( self, room_id: str, sender: str, choice: str ) -> str | None: """Handle duration selection.""" duration = DURATION_OPTIONS.get(choice) session = self.sessions.get(sender, room_id) ui_de = session.ui_language == "de" if not duration: if ui_de: return "Bitte wähle: **1** (5 Min), **2** (10 Min) oder **3** (15 Min)." return "Please pick: **1** (5 min), **2** (10 min), or **3** (15 min)." session.duration_minutes = duration session.state = ArticleState.DURATION self.sessions.touch(sender, room_id) if session.detected_topics: topic_list = "\n".join( f" • {t}" for t in session.detected_topics ) if ui_de: return ( f"Dauer: **{duration} Min**. Auf welche Themen fokussieren?\n" f"{topic_list}\n\n" f"Antworte mit Themennummern (kommagetrennt), bestimmten Themen oder **alle**." ) return ( f"Duration: **{duration} min**. Focus on which topics?\n" f"{topic_list}\n\n" f"Reply with topic numbers (comma-separated), specific topics, or **all**." ) else: if ui_de: return ( f"Dauer: **{duration} Min**. Bestimmte Themen im Fokus?\n" f"Antworte mit Themen (kommagetrennt) oder **alle** für eine allgemeine Zusammenfassung." ) return ( f"Duration: **{duration} min**. Any specific topics to focus on?\n" f"Reply with topics (comma-separated) or **all** for a general summary." ) def _on_topics( self, room_id: str, sender: str, body: str ) -> str | None: """Handle topic selection. Returns __GENERATE__ to trigger pipeline.""" session = self.sessions.get(sender, room_id) body_lower = body.strip().lower() if body_lower in ("all", "alle", "everything", "alles"): session.topics = session.detected_topics or [] else: # Try to match by number parts = re.split(r'[,\s]+', body.strip()) selected = [] for p in parts: p = p.strip() if p.isdigit(): idx = int(p) - 1 if 0 <= idx < len(session.detected_topics): selected.append(session.detected_topics[idx]) elif p: selected.append(p) session.topics = selected or session.detected_topics or [] session.state = ArticleState.GENERATING self.sessions.touch(sender, room_id) return "__GENERATE__" async def _on_action_choice( self, room_id: str, sender: str, body: str, body_lower: str ) -> str | None: """Handle user's choice after URL detection: discuss, text summary, or audio.""" session = self.sessions.get(sender, room_id) choice = _classify_choice(body) if choice == "discuss": article_context = session.content[:8000] title = session.title self.sessions.reset(sender, room_id) return f"__DISCUSS__{title}\n{article_context}" if choice == "text": return await self._generate_text_summary(room_id, sender) if choice == "audio": return self._prompt_language(room_id, sender) # Unrecognized — user is just chatting, pass through with article context article_context = session.content[:8000] title = session.title self.sessions.reset(sender, room_id) return f"__DISCUSS__{title}\n{article_context}" def _prompt_language(self, room_id: str, sender: str) -> str: """Present language selection for audio summary.""" session = self.sessions.get(sender, room_id) session.state = ArticleState.AWAITING_LANGUAGE self.sessions.touch(sender, room_id) if session.ui_language == "de": return ( "In welcher Sprache soll die Audiozusammenfassung sein?\n" "1\ufe0f\u20e3 Englisch\n" "2\ufe0f\u20e3 Deutsch" ) return ( "What language for the audio summary?\n" "1\ufe0f\u20e3 English\n" "2\ufe0f\u20e3 German" ) async def _generate_text_summary(self, room_id: str, sender: str) -> str | None: """Generate a text-only summary of the article.""" session = self.sessions.get(sender, room_id) try: resp = await self.llm.chat.completions.create( model=self.model, messages=[ { "role": "system", "content": ( "Summarize this article concisely in 3-5 paragraphs. " "Respond in the same language as the article." ), }, { "role": "user", "content": f"Article: {session.title}\n\n{session.content[:12000]}", }, ], max_tokens=1000, temperature=0.3, ) summary = resp.choices[0].message.content.strip() session.summary_text = summary session.state = ArticleState.COMPLETE self.sessions.touch(sender, room_id) if session.ui_language == "de": return ( f"**Zusammenfassung: {session.title}**\n\n{summary}\n\n" f"_Stelle Folgefragen oder teile einen neuen Link._" ) return ( f"**Summary: {session.title}**\n\n{summary}\n\n" f"_Ask follow-up questions or share a new link._" ) except Exception: logger.warning("Text summary failed", exc_info=True) self.sessions.reset(sender, room_id) return None async def generate_and_post(self, bot, room_id: str, sender: str) -> None: """Run the full pipeline: summarize → TTS → upload MP3.""" session = self.sessions.get(sender, room_id) ui_de = session.ui_language == "de" topics_str = ", ".join(session.topics) if session.topics else ("alle Themen" if ui_de else "all topics") if ui_de: await bot._send_text( room_id, f"Erstelle {session.duration_minutes}-Min {session.language.upper()} " f"Zusammenfassung von **{session.title}** (Fokus: {topics_str})...", ) else: await bot._send_text( room_id, f"Generating {session.duration_minutes}-min {session.language.upper()} " f"summary of **{session.title}** (focus: {topics_str})...", ) try: # Step 1: Summarize summary = await summarize_article( content=session.content, language=session.language, duration_minutes=session.duration_minutes, topics=session.topics, llm_client=self.llm, model=self.model, ) session.summary_text = summary # Step 2: TTS mp3_bytes, duration_secs = await generate_audio( text=summary, api_key=self.elevenlabs_key, voice_id=self.voice_id, language=session.language, ) # Step 3: Upload and send audio filename = re.sub(r'[^\w\s-]', '', session.title)[:50].strip() filename = f"{filename}.mp3" if filename else "summary.mp3" await bot._send_audio(room_id, mp3_bytes, filename, duration_secs) # Step 4: Send transcript transcript_preview = summary[:500] if len(summary) > 500: transcript_preview += "..." if ui_de: await bot._send_text( room_id, f"**Zusammenfassung von:** {session.title}\n\n{transcript_preview}\n\n" f"_Du kannst Folgefragen zu diesem Artikel stellen._", ) else: await bot._send_text( room_id, f"**Summary of:** {session.title}\n\n{transcript_preview}\n\n" f"_You can ask follow-up questions about this article._", ) session.state = ArticleState.COMPLETE self.sessions.touch(sender, room_id) except Exception: logger.exception("Article summary pipeline failed for %s", session.url) if ui_de: await bot._send_text( room_id, "Entschuldigung, die Audiozusammenfassung konnte nicht erstellt werden. Bitte versuche es erneut." ) else: await bot._send_text( room_id, "Sorry, I couldn't generate the audio summary. Please try again." ) self.sessions.reset(sender, room_id) async def _on_followup( self, room_id: str, sender: str, body: str ) -> str | None: """Answer follow-up questions about the summarized article.""" session = self.sessions.get(sender, room_id) # If user posts a new URL, start fresh urls = URL_PATTERN.findall(body) if any(is_article_url(u) for u in urls): self.sessions.reset(sender, room_id) return await self._check_for_url(room_id, sender, body) # Check if it looks like a question about the article question_indicators = ["?", "what", "how", "why", "explain", "was", "wie", "warum", "erkläre"] is_question = any(q in body.lower() for q in question_indicators) if not is_question: # Not a question — reset and let normal handler take over self.sessions.reset(sender, room_id) return None try: resp = await self.llm.chat.completions.create( model=self.model, messages=[ { "role": "system", "content": ( "You are answering follow-up questions about an article. " "Use the article content below to answer. Be concise. " "Respond in the same language as the question." ), }, { "role": "user", "content": ( f"Article: {session.title}\n\n" f"{session.content[:8000]}\n\n" f"Summary: {session.summary_text[:3000]}\n\n" f"Question: {body}" ), }, ], max_tokens=500, temperature=0.5, ) return resp.choices[0].message.content.strip() except Exception: logger.warning("Follow-up Q&A failed", exc_info=True) self.sessions.reset(sender, room_id) return None