"""Skyvern step — browser automation via Skyvern API for pipeline execution.""" import asyncio import json import logging import os import httpx logger = logging.getLogger(__name__) SKYVERN_BASE_URL = os.environ.get("SKYVERN_BASE_URL", "http://skyvern:8000") SKYVERN_API_KEY = os.environ.get("SKYVERN_API_KEY", "") POLL_INTERVAL = 5 MAX_POLL_TIME = 300 async def execute_skyvern(config: dict, send_text=None, target_room: str = "", **_kwargs) -> str: """Dispatch a browser task to Skyvern and return extracted data. Config fields: url: target URL (required) goal: navigation goal / prompt (required) data_extraction_goal: what to extract (optional, added to prompt) extraction_schema: JSON schema for structured extraction (optional) credential_id: Skyvern credential ID for login (optional) totp_identifier: email/phone for TOTP (optional) timeout_s: max poll time in seconds (optional, default 300) """ if not SKYVERN_API_KEY: raise RuntimeError("SKYVERN_API_KEY not configured") url = config.get("url", "") goal = config.get("goal", "") data_extraction_goal = config.get("data_extraction_goal", "") extraction_schema = config.get("extraction_schema") credential_id = config.get("credential_id") totp_identifier = config.get("totp_identifier") max_poll = config.get("timeout_s", MAX_POLL_TIME) if not url or not goal: raise ValueError("Skyvern step requires 'url' and 'goal' in config") # Build prompt combining goal and extraction goal prompt = goal if data_extraction_goal: prompt += f"\n\nExtract the following: {data_extraction_goal}" payload: dict = { "prompt": prompt, "url": url, "engine": "skyvern-v2", } if extraction_schema: if isinstance(extraction_schema, str): extraction_schema = json.loads(extraction_schema) payload["data_extraction_schema"] = extraction_schema if credential_id: payload["credential_id"] = credential_id if totp_identifier: payload["totp_identifier"] = totp_identifier headers = { "Content-Type": "application/json", "x-api-key": SKYVERN_API_KEY, } async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{SKYVERN_BASE_URL}/v1/run/tasks", headers=headers, json=payload, ) resp.raise_for_status() run_id = resp.json()["run_id"] logger.info("Skyvern pipeline task created: %s", run_id) if send_text and target_room: await send_text(target_room, f"Browser task started for {url}...") # Poll for completion elapsed = 0 async with httpx.AsyncClient(timeout=15.0) as client: while elapsed < max_poll: resp = await client.get( f"{SKYVERN_BASE_URL}/v1/runs/{run_id}", headers={"x-api-key": SKYVERN_API_KEY}, ) resp.raise_for_status() data = resp.json() status = data.get("status", "") if status == "completed": extracted = data.get("extracted_information") or data.get("extracted_data") if extracted is None: return "Task completed, no data extracted." if isinstance(extracted, (dict, list)): return json.dumps(extracted, ensure_ascii=False) return str(extracted) if status in ("failed", "terminated", "timed_out"): error = data.get("error") or data.get("failure_reason") or status raise RuntimeError(f"Skyvern task {status}: {error}") await asyncio.sleep(POLL_INTERVAL) elapsed += POLL_INTERVAL raise TimeoutError(f"Skyvern task {run_id} did not complete within {max_poll}s")