diff --git a/cron/browser_executor.py b/cron/browser_executor.py index 5dddf49..888bc77 100644 --- a/cron/browser_executor.py +++ b/cron/browser_executor.py @@ -1,43 +1,153 @@ -"""Browser scrape executor for cron jobs (placeholder for workflow-use integration).""" +"""Browser scrape executor — dispatches jobs to Skyvern API.""" +import asyncio +import json import logging +import os + +import httpx logger = logging.getLogger(__name__) +SKYVERN_BASE_URL = os.environ.get("SKYVERN_BASE_URL", "http://skyvern:8000") +SKYVERN_API_KEY = os.environ.get("SKYVERN_API_KEY", "") + +POLL_INTERVAL = 5 # seconds +MAX_POLL_TIME = 300 # 5 minutes + + +async def _create_task(url: str, goal: str, extraction_schema: dict | None = None, + credential_id: str | None = None, totp_identifier: str | None = None) -> str: + """Create a Skyvern task and return the run_id.""" + payload: dict = { + "prompt": goal, + "url": url, + "engine": "skyvern-v2", + } + if extraction_schema: + payload["data_extraction_schema"] = extraction_schema + if credential_id: + payload["credential_id"] = credential_id + if totp_identifier: + payload["totp_identifier"] = totp_identifier + + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{SKYVERN_BASE_URL}/v1/run/tasks", + headers={ + "Content-Type": "application/json", + "x-api-key": SKYVERN_API_KEY, + }, + json=payload, + ) + resp.raise_for_status() + data = resp.json() + return data["run_id"] + + +async def _poll_task(run_id: str) -> dict: + """Poll Skyvern until task completes or times out.""" + elapsed = 0 + async with httpx.AsyncClient(timeout=15.0) as client: + while elapsed < MAX_POLL_TIME: + resp = await client.get( + f"{SKYVERN_BASE_URL}/v1/runs/{run_id}", + headers={"x-api-key": SKYVERN_API_KEY}, + ) + resp.raise_for_status() + data = resp.json() + status = data.get("status", "") + + if status in ("completed", "failed", "terminated", "timed_out"): + return data + + await asyncio.sleep(POLL_INTERVAL) + elapsed += POLL_INTERVAL + + return {"status": "timed_out", "error": f"Polling exceeded {MAX_POLL_TIME}s"} + + +def _format_extraction(data: dict) -> str: + """Format extracted data for Matrix message.""" + extracted = data.get("extracted_information") or data.get("extracted_data") + if not extracted: + return "No data extracted." + if isinstance(extracted, dict): + return json.dumps(extracted, indent=2, ensure_ascii=False) + return str(extracted) + async def execute_browser_scrape(job: dict, send_text, **_kwargs) -> dict: - """Execute a browser-based scraping job. - - This is a placeholder that will be fully implemented when workflow-use - or Skyvern is deployed. For now, it posts a message indicating the - feature is pending setup. - """ + """Execute a browser-based scraping job via Skyvern.""" target_room = job["targetRoom"] config = job.get("config", {}) url = config.get("url", "") + goal = config.get("goal", config.get("query", f"Scrape content from {url}")) + extraction_schema = config.get("extractionSchema") browser_profile = job.get("browserProfile") - if not browser_profile: + if not url: + await send_text(target_room, f"**{job['name']}**: No URL configured.") + return {"status": "error", "error": "No URL configured"} + + if not SKYVERN_API_KEY: await send_text( target_room, - f"**{job['name']}**: Browser scan requires a connected browser profile. " - f"Set one up at https://matrixhost.eu/settings/automations", + f"**{job['name']}**: Browser automation not configured (missing API key).", ) - return {"status": "error", "error": "No browser profile configured"} + return {"status": "error", "error": "SKYVERN_API_KEY not set"} - if browser_profile.get("status") == "expired": - await send_text( - target_room, - f"**{job['name']}**: Your browser session has expired. " - f"Please re-record at https://matrixhost.eu/settings/automations", + # Map browser profile fields to Skyvern credential + credential_id = None + totp_identifier = None + if browser_profile: + if browser_profile.get("status") == "expired": + await send_text( + target_room, + f"**{job['name']}**: Browser credential expired. " + f"Update at https://matrixhost.eu/settings/automations", + ) + return {"status": "error", "error": "Browser credential expired"} + credential_id = browser_profile.get("credentialId") + totp_identifier = browser_profile.get("totpIdentifier") + + try: + run_id = await _create_task( + url=url, + goal=goal, + extraction_schema=extraction_schema, + credential_id=credential_id, + totp_identifier=totp_identifier, ) - return {"status": "error", "error": "Browser session expired"} + logger.info("Skyvern task created: %s for job %s", run_id, job["name"]) - # TODO: Integrate workflow-use or Skyvern for actual browser replay - # For now, fall back to a simple notification - await send_text( - target_room, - f"**{job['name']}**: Browser scan for {url} is configured but browser " - f"replay is not yet available. Web Search automations work now.", - ) - return {"status": "error", "error": "Browser executor not yet implemented"} + result = await _poll_task(run_id) + status = result.get("status", "unknown") + + if status == "completed": + extracted = _format_extraction(result) + msg = f"**{job['name']}** — {url}\n\n{extracted}" + # Truncate if too long for Matrix + if len(msg) > 4000: + msg = msg[:3950] + "\n\n_(truncated)_" + await send_text(target_room, msg) + return {"status": "success"} + else: + error = result.get("error") or result.get("failure_reason") or status + await send_text( + target_room, + f"**{job['name']}**: Browser task {status} — {error}", + ) + return {"status": "error", "error": str(error)} + + except httpx.HTTPStatusError as exc: + error_msg = f"Skyvern API error: {exc.response.status_code}" + logger.error("Browser executor failed: %s", error_msg, exc_info=True) + await send_text(target_room, f"**{job['name']}**: {error_msg}") + return {"status": "error", "error": error_msg} + + except Exception as exc: + error_msg = str(exc) + logger.error("Browser executor failed: %s", error_msg, exc_info=True) + await send_text(target_room, f"**{job['name']}**: Browser task failed — {error_msg}") + return {"status": "error", "error": error_msg} diff --git a/docker-compose.yml b/docker-compose.yml index d1ac2ce..6f9d2f4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,6 +25,8 @@ services: - MEMORY_SERVICE_TOKEN - PORTAL_URL - BOT_API_KEY + - SKYVERN_BASE_URL=http://skyvern:8000 + - SKYVERN_API_KEY volumes: - bot-data:/data depends_on: @@ -73,6 +75,45 @@ services: timeout: 5s retries: 3 + skyvern: + image: ghcr.io/skyvern-ai/skyvern:latest + restart: unless-stopped + environment: + DATABASE_STRING: postgresql://skyvern:${SKYVERN_DB_PASSWORD:-skyvern}@skyvern-db:5432/skyvern + OPENAI_API_KEY: placeholder + OPENAI_API_BASE: ${LITELLM_BASE_URL} + LLM_KEY: OPENAI_GPT4O + SECONDARY_LLM_KEY: OPENAI_GPT4O_MINI + BROWSER_TYPE: chromium-headless + ENV: local + PORT: "8000" + ALLOWED_ORIGINS: '["http://localhost:8000"]' + depends_on: + skyvern-db: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://127.0.0.1:8000/api/v1/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + + skyvern-db: + image: postgres:17 + restart: unless-stopped + environment: + POSTGRES_USER: skyvern + POSTGRES_PASSWORD: ${SKYVERN_DB_PASSWORD:-skyvern} + POSTGRES_DB: skyvern + volumes: + - skyvern-pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U skyvern -d skyvern"] + interval: 5s + timeout: 3s + retries: 5 + volumes: bot-data: memory-pgdata: + skyvern-pgdata: diff --git a/pipelines/steps/skyvern.py b/pipelines/steps/skyvern.py index 6b2e2f0..232f65e 100644 --- a/pipelines/steps/skyvern.py +++ b/pipelines/steps/skyvern.py @@ -1,24 +1,110 @@ -"""Skyvern step — browser automation via Skyvern API (Phase 2 placeholder).""" +"""Skyvern step — browser automation via Skyvern API for pipeline execution.""" +import asyncio +import json import logging +import os + +import httpx logger = logging.getLogger(__name__) +SKYVERN_BASE_URL = os.environ.get("SKYVERN_BASE_URL", "http://skyvern:8000") +SKYVERN_API_KEY = os.environ.get("SKYVERN_API_KEY", "") + +POLL_INTERVAL = 5 +MAX_POLL_TIME = 300 + async def execute_skyvern(config: dict, send_text=None, target_room: str = "", **_kwargs) -> str: - """Dispatch a browser task to Skyvern. + """Dispatch a browser task to Skyvern and return extracted data. - Phase 2: Will integrate with self-hosted Skyvern on matrixhost. + Config fields: + url: target URL (required) + goal: navigation goal / prompt (required) + data_extraction_goal: what to extract (optional, added to prompt) + extraction_schema: JSON schema for structured extraction (optional) + credential_id: Skyvern credential ID for login (optional) + totp_identifier: email/phone for TOTP (optional) + timeout_s: max poll time in seconds (optional, default 300) """ - task = config.get("task", {}) - url = task.get("url", "") - goal = task.get("goal", "") + if not SKYVERN_API_KEY: + raise RuntimeError("SKYVERN_API_KEY not configured") + + url = config.get("url", "") + goal = config.get("goal", "") + data_extraction_goal = config.get("data_extraction_goal", "") + extraction_schema = config.get("extraction_schema") + credential_id = config.get("credential_id") + totp_identifier = config.get("totp_identifier") + max_poll = config.get("timeout_s", MAX_POLL_TIME) + + if not url or not goal: + raise ValueError("Skyvern step requires 'url' and 'goal' in config") + + # Build prompt combining goal and extraction goal + prompt = goal + if data_extraction_goal: + prompt += f"\n\nExtract the following: {data_extraction_goal}" + + payload: dict = { + "prompt": prompt, + "url": url, + "engine": "skyvern-v2", + } + if extraction_schema: + if isinstance(extraction_schema, str): + extraction_schema = json.loads(extraction_schema) + payload["data_extraction_schema"] = extraction_schema + if credential_id: + payload["credential_id"] = credential_id + if totp_identifier: + payload["totp_identifier"] = totp_identifier + + headers = { + "Content-Type": "application/json", + "x-api-key": SKYVERN_API_KEY, + } + + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{SKYVERN_BASE_URL}/v1/run/tasks", + headers=headers, + json=payload, + ) + resp.raise_for_status() + run_id = resp.json()["run_id"] + + logger.info("Skyvern pipeline task created: %s", run_id) if send_text and target_room: - await send_text( - target_room, - f"**Browser Task**: Skyvern integration pending setup.\n" - f"URL: {url}\nGoal: {goal}", - ) + await send_text(target_room, f"Browser task started for {url}...") - raise NotImplementedError("Skyvern step not yet implemented (Phase 2)") + # Poll for completion + elapsed = 0 + async with httpx.AsyncClient(timeout=15.0) as client: + while elapsed < max_poll: + resp = await client.get( + f"{SKYVERN_BASE_URL}/v1/runs/{run_id}", + headers={"x-api-key": SKYVERN_API_KEY}, + ) + resp.raise_for_status() + data = resp.json() + status = data.get("status", "") + + if status == "completed": + extracted = data.get("extracted_information") or data.get("extracted_data") + if extracted is None: + return "Task completed, no data extracted." + if isinstance(extracted, (dict, list)): + return json.dumps(extracted, ensure_ascii=False) + return str(extracted) + + if status in ("failed", "terminated", "timed_out"): + error = data.get("error") or data.get("failure_reason") or status + raise RuntimeError(f"Skyvern task {status}: {error}") + + await asyncio.sleep(POLL_INTERVAL) + elapsed += POLL_INTERVAL + + raise TimeoutError(f"Skyvern task {run_id} did not complete within {max_poll}s")