"""Brave Search executor for cron jobs with optional LLM filtering.""" import json import logging import os import httpx from .formatter import format_search_results logger = logging.getLogger(__name__) BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "") LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "") LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "") FILTER_MODEL = os.environ.get("BASE_MODEL", "claude-haiku") FILTER_SYSTEM_PROMPT = """You are a search result filter. Given a list of search results and filtering criteria, evaluate each result and return ONLY the ones that match the criteria. Return a JSON array of indices (0-based) of results that match. If none match, return an empty array []. Only return the JSON array, nothing else.""" async def _llm_filter(results: list[dict], criteria: str) -> list[dict]: """Use LLM to filter search results against user-defined criteria.""" if not LITELLM_URL or not LITELLM_KEY: logger.warning("LLM not configured, skipping filter") return results # Build a concise representation of results for the LLM result_descriptions = [] for i, r in enumerate(results): title = r.get("title", "") desc = r.get("description", "") url = r.get("url", "") result_descriptions.append(f"[{i}] {title} — {desc} ({url})") user_msg = ( f"**Criteria:** {criteria}\n\n" f"**Results:**\n" + "\n".join(result_descriptions) ) try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{LITELLM_URL}/chat/completions", headers={"Authorization": f"Bearer {LITELLM_KEY}"}, json={ "model": FILTER_MODEL, "messages": [ {"role": "system", "content": FILTER_SYSTEM_PROMPT}, {"role": "user", "content": user_msg}, ], "temperature": 0, "max_tokens": 200, }, ) resp.raise_for_status() data = resp.json() reply = data["choices"][0]["message"]["content"].strip() # Extract JSON array from response (LLM may include extra text) import re match = re.search(r"\[[\d,\s]*\]", reply) if not match: logger.warning("LLM filter returned no array: %s", reply) return results indices = json.loads(match.group()) if not isinstance(indices, list): logger.warning("LLM filter returned non-list: %s", reply) return results filtered = [results[i] for i in indices if 0 <= i < len(results)] logger.info( "LLM filter: %d/%d results matched criteria", len(filtered), len(results), ) return filtered except Exception as exc: logger.warning("LLM filter failed, returning all results: %s", exc) return results async def execute_brave_search(job: dict, send_text, **_kwargs) -> dict: """Run a Brave Search query, dedup, optionally LLM-filter, post to Matrix.""" if not BRAVE_API_KEY: return {"status": "error", "error": "BRAVE_API_KEY not configured"} config = job.get("config", {}) query = config.get("query", "") criteria = config.get("criteria", "") max_results = config.get("maxResults", 10) target_room = job["targetRoom"] dedup_keys = set(job.get("dedupKeys", [])) if not query: return {"status": "error", "error": "No search query configured"} try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( "https://api.search.brave.com/res/v1/web/search", headers={ "Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY, }, params={"q": query, "count": max_results, "text_decorations": False}, ) resp.raise_for_status() data = resp.json() results = data.get("web", {}).get("results", []) if not results: return {"status": "no_results"} # Dedup by URL new_results = [r for r in results if r.get("url") not in dedup_keys] if not new_results: return {"status": "no_results"} # LLM filter if criteria provided if criteria: new_results = await _llm_filter(new_results, criteria) if not new_results: return {"status": "no_results"} msg = format_search_results(job["name"], new_results) await send_text(target_room, msg) new_keys = [r["url"] for r in new_results if r.get("url")] return { "status": "success", "newDedupKeys": new_keys, } except Exception as exc: logger.error("Brave search cron failed: %s", exc, exc_info=True) return {"status": "error", "error": str(exc)}