"""ElevenLabs TTS — direct API calls to generate MP3 audio.""" from __future__ import annotations import io import logging import httpx logger = logging.getLogger("article-summary.tts") ELEVENLABS_API = "https://api.elevenlabs.io/v1" CHUNK_SIZE = 5000 # Max chars per TTS request async def generate_audio( text: str, api_key: str, voice_id: str, language: str = "en", ) -> tuple[bytes, float]: """Generate MP3 audio from text via ElevenLabs API. Args: text: Text to convert to speech. api_key: ElevenLabs API key. voice_id: ElevenLabs voice ID. language: Language code ("en" or "de"). Returns: Tuple of (mp3_bytes, estimated_duration_seconds). """ chunks = _split_text(text, CHUNK_SIZE) mp3_parts: list[bytes] = [] for i, chunk in enumerate(chunks): logger.info("Generating TTS chunk %d/%d (%d chars)", i + 1, len(chunks), len(chunk)) mp3_data = await _tts_request(chunk, api_key, voice_id, language) mp3_parts.append(mp3_data) combined = b"".join(mp3_parts) # Estimate duration: ~150 words per minute word_count = len(text.split()) est_duration = (word_count / 150) * 60 logger.info("TTS complete: %d bytes, ~%.0fs estimated", len(combined), est_duration) return combined, est_duration async def _tts_request( text: str, api_key: str, voice_id: str, language: str, ) -> bytes: """Single TTS API call.""" url = f"{ELEVENLABS_API}/text-to-speech/{voice_id}" headers = { "xi-api-key": api_key, "Content-Type": "application/json", "Accept": "audio/mpeg", } payload = { "text": text, "model_id": "eleven_multilingual_v2", "voice_settings": { "stability": 0.5, "similarity_boost": 0.75, }, } # Add language hint for non-English if language == "de": payload["language_code"] = "de" async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post(url, json=payload, headers=headers) resp.raise_for_status() return resp.content def _split_text(text: str, max_chars: int) -> list[str]: """Split text at sentence boundaries for TTS chunking.""" if len(text) <= max_chars: return [text] chunks: list[str] = [] current = "" for sentence in _sentence_split(text): if len(current) + len(sentence) > max_chars and current: chunks.append(current.strip()) current = sentence else: current += sentence if current.strip(): chunks.append(current.strip()) return chunks or [text[:max_chars]] def _sentence_split(text: str) -> list[str]: """Split text into sentences, keeping delimiters attached.""" import re parts = re.split(r'(?<=[.!?])\s+', text) # Re-add trailing space for joining return [p + " " for p in parts]