"""Browser scrape executor — dispatches jobs to Skyvern API.""" import asyncio import json import logging import os import httpx logger = logging.getLogger(__name__) SKYVERN_BASE_URL = os.environ.get("SKYVERN_BASE_URL", "http://skyvern:8000") SKYVERN_API_KEY = os.environ.get("SKYVERN_API_KEY", "") POLL_INTERVAL = 5 # seconds MAX_POLL_TIME = 300 # 5 minutes async def _create_task(url: str, goal: str, extraction_schema: dict | None = None, credential_id: str | None = None, totp_identifier: str | None = None) -> str: """Create a Skyvern task and return the run_id.""" payload: dict = { "prompt": goal, "url": url, "engine": "skyvern-v2", } if extraction_schema: payload["data_extraction_schema"] = extraction_schema if credential_id: payload["credential_id"] = credential_id if totp_identifier: payload["totp_identifier"] = totp_identifier async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{SKYVERN_BASE_URL}/api/v1/tasks", headers={ "Content-Type": "application/json", "x-api-key": SKYVERN_API_KEY, }, json=payload, ) resp.raise_for_status() data = resp.json() return data["task_id"] async def _poll_task(run_id: str) -> dict: """Poll Skyvern until task completes or times out.""" elapsed = 0 async with httpx.AsyncClient(timeout=15.0) as client: while elapsed < MAX_POLL_TIME: resp = await client.get( f"{SKYVERN_BASE_URL}/api/v1/tasks/{run_id}", headers={"x-api-key": SKYVERN_API_KEY}, ) resp.raise_for_status() data = resp.json() status = data.get("status", "") if status in ("completed", "failed", "terminated", "timed_out"): return data await asyncio.sleep(POLL_INTERVAL) elapsed += POLL_INTERVAL return {"status": "timed_out", "error": f"Polling exceeded {MAX_POLL_TIME}s"} def _format_extraction(data: dict) -> str: """Format extracted data for Matrix message.""" extracted = data.get("extracted_information") or data.get("extracted_data") if not extracted: return "No data extracted." if isinstance(extracted, dict): return json.dumps(extracted, indent=2, ensure_ascii=False) return str(extracted) async def execute_browser_scrape(job: dict, send_text, **_kwargs) -> dict: """Execute a browser-based scraping job via Skyvern.""" target_room = job["targetRoom"] config = job.get("config", {}) url = config.get("url", "") extraction_goal = config.get("extractionGoal", "") goal = config.get("goal", config.get("query", f"Scrape content from {url}")) if extraction_goal: goal += f"\n\nExtract the following: {extraction_goal}" extraction_schema = config.get("extractionSchema") browser_profile = job.get("browserProfile") if not url: await send_text(target_room, f"**{job['name']}**: No URL configured.") return {"status": "error", "error": "No URL configured"} if not SKYVERN_API_KEY: await send_text( target_room, f"**{job['name']}**: Browser automation not configured (missing API key).", ) return {"status": "error", "error": "SKYVERN_API_KEY not set"} # Map browser profile fields to Skyvern credential credential_id = None totp_identifier = None if browser_profile: if browser_profile.get("status") == "expired": await send_text( target_room, f"**{job['name']}**: Browser credential expired. " f"Update at https://matrixhost.eu/settings/automations", ) return {"status": "error", "error": "Browser credential expired"} credential_id = browser_profile.get("credentialId") totp_identifier = browser_profile.get("totpIdentifier") try: run_id = await _create_task( url=url, goal=goal, extraction_schema=extraction_schema, credential_id=credential_id, totp_identifier=totp_identifier, ) logger.info("Skyvern task created: %s for job %s", run_id, job["name"]) result = await _poll_task(run_id) status = result.get("status", "unknown") if status == "completed": extracted = _format_extraction(result) msg = f"**{job['name']}** — {url}\n\n{extracted}" # Truncate if too long for Matrix if len(msg) > 4000: msg = msg[:3950] + "\n\n_(truncated)_" await send_text(target_room, msg) return {"status": "success"} else: error = result.get("error") or result.get("failure_reason") or status await send_text( target_room, f"**{job['name']}**: Browser task {status} — {error}", ) return {"status": "error", "error": str(error)} except httpx.HTTPStatusError as exc: error_msg = f"Skyvern API error: {exc.response.status_code}" logger.error("Browser executor failed: %s", error_msg, exc_info=True) await send_text(target_room, f"**{job['name']}**: {error_msg}") return {"status": "error", "error": error_msg} except Exception as exc: error_msg = str(exc) logger.error("Browser executor failed: %s", error_msg, exc_info=True) await send_text(target_room, f"**{job['name']}**: Browser task failed — {error_msg}") return {"status": "error", "error": error_msg}