diff --git a/cron/browser_executor.py b/cron/browser_executor.py deleted file mode 100644 index 7f17d0e..0000000 --- a/cron/browser_executor.py +++ /dev/null @@ -1,192 +0,0 @@ -"""Browser scrape executor — dispatches jobs to Skyvern API.""" - -import asyncio -import json -import logging -import os - -import httpx - -logger = logging.getLogger(__name__) - -SKYVERN_BASE_URL = os.environ.get("SKYVERN_BASE_URL", "http://skyvern:8000") -SKYVERN_API_KEY = os.environ.get("SKYVERN_API_KEY", "") - -POLL_INTERVAL = 5 # seconds -MAX_POLL_TIME = 300 # 5 minutes - - -async def _create_task(url: str, goal: str, extraction_goal: str = "", - extraction_schema: dict | None = None, - credential_id: str | None = None, totp_identifier: str | None = None) -> str: - """Create a Skyvern task and return the task_id.""" - payload: dict = { - "url": url, - "navigation_goal": goal, - "data_extraction_goal": extraction_goal or goal, - } - if extraction_schema: - payload["extracted_information_schema"] = extraction_schema - if credential_id: - payload["credential_id"] = credential_id - if totp_identifier: - payload["totp_identifier"] = totp_identifier - - async with httpx.AsyncClient(timeout=60.0) as client: - resp = await client.post( - f"{SKYVERN_BASE_URL}/api/v1/tasks", - headers={ - "Content-Type": "application/json", - "x-api-key": SKYVERN_API_KEY, - }, - json=payload, - ) - resp.raise_for_status() - data = resp.json() - return data["task_id"] - - -async def _poll_task(run_id: str) -> dict: - """Poll Skyvern until task completes or times out.""" - elapsed = 0 - async with httpx.AsyncClient(timeout=60.0) as client: - while elapsed < MAX_POLL_TIME: - resp = await client.get( - f"{SKYVERN_BASE_URL}/api/v1/tasks/{run_id}", - headers={"x-api-key": SKYVERN_API_KEY}, - ) - resp.raise_for_status() - data = resp.json() - status = data.get("status", "") - - if status in ("completed", "failed", "terminated", "timed_out"): - return data - - await asyncio.sleep(POLL_INTERVAL) - elapsed += POLL_INTERVAL - - return {"status": "timed_out", "error": f"Polling exceeded {MAX_POLL_TIME}s"} - - -def _format_extraction(data: dict) -> str: - """Format extracted data as readable markdown for Matrix.""" - extracted = data.get("extracted_information") or data.get("extracted_data") - if not extracted: - return "No data extracted." - - # Handle list of items (most common: news, listings, results) - items = None - if isinstance(extracted, list): - items = extracted - elif isinstance(extracted, dict): - # Look for the first list value in the dict (e.g. {"news": [...]}) - for v in extracted.values(): - if isinstance(v, list) and v: - items = v - break - - if items and isinstance(items[0], dict): - lines = [] - for item in items: - # Try common field names for title/link - title = item.get("title") or item.get("name") or item.get("headline") or "" - link = item.get("link") or item.get("url") or item.get("href") or "" - # Build a line with remaining fields as details - skip = {"title", "name", "headline", "link", "url", "href"} - details = " · ".join( - str(v) for k, v in item.items() - if k not in skip and v - ) - if title and link: - line = f"- [{title}]({link})" - elif title: - line = f"- {title}" - else: - line = f"- {json.dumps(item, ensure_ascii=False)}" - if details: - line += f" \n {details}" - lines.append(line) - return "\n".join(lines) - - # Fallback: compact JSON - if isinstance(extracted, (dict, list)): - return json.dumps(extracted, indent=2, ensure_ascii=False) - return str(extracted) - - -async def execute_browser_scrape(job: dict, send_text, **_kwargs) -> dict: - """Execute a browser-based scraping job via Skyvern.""" - target_room = job["targetRoom"] - config = job.get("config", {}) - url = config.get("url", "") - goal = config.get("goal", config.get("query", f"Scrape content from {url}")) - extraction_goal = config.get("extractionGoal", "") or goal - extraction_schema = config.get("extractionSchema") - browser_profile = job.get("browserProfile") - - if not url: - await send_text(target_room, f"**{job['name']}**: No URL configured.") - return {"status": "error", "error": "No URL configured"} - - if not SKYVERN_API_KEY: - await send_text( - target_room, - f"**{job['name']}**: Browser automation not configured (missing API key).", - ) - return {"status": "error", "error": "SKYVERN_API_KEY not set"} - - # Map browser profile fields to Skyvern credential - credential_id = None - totp_identifier = None - if browser_profile: - if browser_profile.get("status") == "expired": - await send_text( - target_room, - f"**{job['name']}**: Browser credential expired. " - f"Update at https://matrixhost.eu/settings/automations", - ) - return {"status": "error", "error": "Browser credential expired"} - credential_id = browser_profile.get("credentialId") - totp_identifier = browser_profile.get("totpIdentifier") - - try: - run_id = await _create_task( - url=url, - goal=goal, - extraction_goal=extraction_goal, - extraction_schema=extraction_schema, - credential_id=credential_id, - totp_identifier=totp_identifier, - ) - logger.info("Skyvern task created: %s for job %s", run_id, job["name"]) - - result = await _poll_task(run_id) - status = result.get("status", "unknown") - - if status == "completed": - extracted = _format_extraction(result) - msg = f"**{job['name']}** — {url}\n\n{extracted}" - # Truncate if too long for Matrix - if len(msg) > 4000: - msg = msg[:3950] + "\n\n_(truncated)_" - await send_text(target_room, msg) - return {"status": "success"} - else: - error = result.get("error") or result.get("failure_reason") or status - await send_text( - target_room, - f"**{job['name']}**: Browser task {status} — {error}", - ) - return {"status": "error", "error": str(error)} - - except httpx.HTTPStatusError as exc: - error_msg = f"Skyvern API error: {exc.response.status_code}" - logger.error("Browser executor failed: %s", error_msg, exc_info=True) - await send_text(target_room, f"**{job['name']}**: {error_msg}") - return {"status": "error", "error": error_msg} - - except Exception as exc: - error_msg = str(exc) - logger.error("Browser executor failed: %s", error_msg, exc_info=True) - await send_text(target_room, f"**{job['name']}**: Browser task failed — {error_msg}") - return {"status": "error", "error": error_msg} diff --git a/cron/executor.py b/cron/executor.py index 9edba97..9aaefb1 100644 --- a/cron/executor.py +++ b/cron/executor.py @@ -3,14 +3,12 @@ import logging from .brave_search import execute_brave_search -from .browser_executor import execute_browser_scrape from .reminder import execute_reminder logger = logging.getLogger(__name__) EXECUTORS = { "brave_search": execute_brave_search, - "browser_scrape": execute_browser_scrape, "reminder": execute_reminder, } diff --git a/docker-compose.yml b/docker-compose.yml index a95efbe..5cbd22e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,8 +25,6 @@ services: - MEMORY_SERVICE_TOKEN - PORTAL_URL - BOT_API_KEY - - SKYVERN_BASE_URL=http://skyvern:8000 - - SKYVERN_API_KEY ports: - "9100:9100" volumes: @@ -84,54 +82,6 @@ services: timeout: 5s retries: 3 - skyvern: - image: public.ecr.aws/skyvern/skyvern:latest - restart: unless-stopped - environment: - DATABASE_STRING: postgresql+psycopg://skyvern:${SKYVERN_DB_PASSWORD:-skyvern}@skyvern-db:5432/skyvern - ENABLE_OPENAI_COMPATIBLE: "true" - OPENAI_COMPATIBLE_API_KEY: ${LITELLM_API_KEY} - OPENAI_COMPATIBLE_API_BASE: ${LITELLM_BASE_URL} - OPENAI_COMPATIBLE_MODEL_NAME: gpt-4o - OPENAI_COMPATIBLE_SUPPORTS_VISION: "true" - LLM_KEY: OPENAI_COMPATIBLE - SECONDARY_LLM_KEY: OPENAI_COMPATIBLE - BROWSER_TYPE: chromium-headful - ENABLE_CODE_BLOCK: "true" - ENV: local - PORT: "8000" - ALLOWED_ORIGINS: '["http://localhost:8000"]' - volumes: - - skyvern-artifacts:/data/artifacts - - skyvern-videos:/data/videos - depends_on: - skyvern-db: - condition: service_healthy - healthcheck: - test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/api/v1/heartbeat')"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 60s - - skyvern-db: - image: postgres:14-alpine - restart: unless-stopped - environment: - POSTGRES_USER: skyvern - POSTGRES_PASSWORD: ${SKYVERN_DB_PASSWORD:-skyvern} - POSTGRES_DB: skyvern - volumes: - - skyvern-pgdata:/var/lib/postgresql/data - healthcheck: - test: ["CMD-SHELL", "pg_isready -U skyvern -d skyvern"] - interval: 5s - timeout: 3s - retries: 5 - volumes: bot-data: memory-pgdata: - skyvern-pgdata: - skyvern-artifacts: - skyvern-videos: diff --git a/pipelines/steps/__init__.py b/pipelines/steps/__init__.py index 2c134b6..9656595 100644 --- a/pipelines/steps/__init__.py +++ b/pipelines/steps/__init__.py @@ -6,7 +6,6 @@ from .script import execute_script from .claude_prompt import execute_claude_prompt from .template import execute_template from .api_call import execute_api_call -from .skyvern import execute_skyvern from .pitrader_step import execute_pitrader logger = logging.getLogger(__name__) @@ -16,7 +15,6 @@ STEP_EXECUTORS = { "claude_prompt": execute_claude_prompt, "template": execute_template, "api_call": execute_api_call, - "skyvern": execute_skyvern, "pitrader_script": execute_pitrader, } diff --git a/pipelines/steps/skyvern.py b/pipelines/steps/skyvern.py deleted file mode 100644 index feb743d..0000000 --- a/pipelines/steps/skyvern.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Skyvern step — browser automation via Skyvern API for pipeline execution.""" - -import asyncio -import json -import logging -import os - -import httpx - -logger = logging.getLogger(__name__) - -SKYVERN_BASE_URL = os.environ.get("SKYVERN_BASE_URL", "http://skyvern:8000") -SKYVERN_API_KEY = os.environ.get("SKYVERN_API_KEY", "") - -POLL_INTERVAL = 5 -MAX_POLL_TIME = 300 - - -async def execute_skyvern(config: dict, send_text=None, target_room: str = "", **_kwargs) -> str: - """Dispatch a browser task to Skyvern and return extracted data. - - Config fields: - url: target URL (required) - goal: navigation goal / prompt (required) - data_extraction_goal: what to extract (optional, added to prompt) - extraction_schema: JSON schema for structured extraction (optional) - credential_id: Skyvern credential ID for login (optional) - totp_identifier: email/phone for TOTP (optional) - timeout_s: max poll time in seconds (optional, default 300) - """ - if not SKYVERN_API_KEY: - raise RuntimeError("SKYVERN_API_KEY not configured") - - url = config.get("url", "") - goal = config.get("goal", "") - data_extraction_goal = config.get("data_extraction_goal", "") - extraction_schema = config.get("extraction_schema") - credential_id = config.get("credential_id") - totp_identifier = config.get("totp_identifier") - max_poll = config.get("timeout_s", MAX_POLL_TIME) - - if not url or not goal: - raise ValueError("Skyvern step requires 'url' and 'goal' in config") - - payload: dict = { - "url": url, - "navigation_goal": goal, - "data_extraction_goal": data_extraction_goal or goal, - } - if extraction_schema: - if isinstance(extraction_schema, str): - extraction_schema = json.loads(extraction_schema) - payload["extracted_information_schema"] = extraction_schema - if credential_id: - payload["credential_id"] = credential_id - if totp_identifier: - payload["totp_identifier"] = totp_identifier - - headers = { - "Content-Type": "application/json", - "x-api-key": SKYVERN_API_KEY, - } - - async with httpx.AsyncClient(timeout=60.0) as client: - resp = await client.post( - f"{SKYVERN_BASE_URL}/api/v1/tasks", - headers=headers, - json=payload, - ) - resp.raise_for_status() - run_id = resp.json()["task_id"] - - logger.info("Skyvern pipeline task created: %s", run_id) - - if send_text and target_room: - await send_text(target_room, f"Browser task started for {url}...") - - # Poll for completion - elapsed = 0 - async with httpx.AsyncClient(timeout=60.0) as client: - while elapsed < max_poll: - resp = await client.get( - f"{SKYVERN_BASE_URL}/api/v1/tasks/{run_id}", - headers={"x-api-key": SKYVERN_API_KEY}, - ) - resp.raise_for_status() - data = resp.json() - status = data.get("status", "") - - if status == "completed": - extracted = data.get("extracted_information") or data.get("extracted_data") - if extracted is None: - return "Task completed, no data extracted." - if isinstance(extracted, (dict, list)): - return json.dumps(extracted, ensure_ascii=False) - return str(extracted) - - if status in ("failed", "terminated", "timed_out"): - error = data.get("error") or data.get("failure_reason") or status - raise RuntimeError(f"Skyvern task {status}: {error}") - - await asyncio.sleep(POLL_INTERVAL) - elapsed += POLL_INTERVAL - - raise TimeoutError(f"Skyvern task {run_id} did not complete within {max_poll}s") diff --git a/tests/test_cron_browser_executor.py b/tests/test_cron_browser_executor.py deleted file mode 100644 index a67a495..0000000 --- a/tests/test_cron_browser_executor.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Tests for the browser scrape executor.""" - -from unittest.mock import AsyncMock - -import pytest - -from cron.browser_executor import execute_browser_scrape - - -class TestBrowserScrapeExecutor: - @pytest.mark.asyncio - async def test_returns_error_without_profile(self): - job = { - "id": "j1", - "name": "FB Scan", - "config": {"url": "https://facebook.com/marketplace"}, - "targetRoom": "!room:test", - "browserProfile": None, - } - send_text = AsyncMock() - result = await execute_browser_scrape(job=job, send_text=send_text) - assert result["status"] == "error" - assert "browser profile" in result["error"].lower() - send_text.assert_called_once() - msg = send_text.call_args[0][1] - assert "matrixhost.eu/settings/automations" in msg - - @pytest.mark.asyncio - async def test_returns_error_with_expired_profile(self): - job = { - "id": "j1", - "name": "FB Scan", - "config": {"url": "https://facebook.com/marketplace"}, - "targetRoom": "!room:test", - "browserProfile": {"id": "b1", "status": "expired", "name": "facebook"}, - } - send_text = AsyncMock() - result = await execute_browser_scrape(job=job, send_text=send_text) - assert result["status"] == "error" - assert "expired" in result["error"].lower() - send_text.assert_called_once() - msg = send_text.call_args[0][1] - assert "re-record" in msg.lower() - - @pytest.mark.asyncio - async def test_placeholder_with_active_profile(self): - job = { - "id": "j1", - "name": "FB Scan", - "config": {"url": "https://facebook.com/marketplace"}, - "targetRoom": "!room:test", - "browserProfile": {"id": "b1", "status": "active", "name": "facebook"}, - } - send_text = AsyncMock() - result = await execute_browser_scrape(job=job, send_text=send_text) - # Currently a placeholder, should indicate not yet implemented - assert result["status"] == "error" - assert "not yet implemented" in result["error"].lower() diff --git a/tests/test_cron_executor.py b/tests/test_cron_executor.py index 938332a..0300d77 100644 --- a/tests/test_cron_executor.py +++ b/tests/test_cron_executor.py @@ -33,16 +33,16 @@ class TestExecuteJob: assert "Don't forget!" in send_text.call_args[0][1] @pytest.mark.asyncio - async def test_dispatches_to_browser_scrape_no_profile(self): + async def test_unknown_browser_scrape_returns_error(self): + """browser_scrape was removed (Skyvern archived), should fail as unknown.""" job = { "id": "j1", "name": "Scrape Test", "jobType": "browser_scrape", "config": {"url": "https://example.com"}, "targetRoom": "!room:test", - "browserProfile": None, } send_text = AsyncMock() result = await execute_job(job=job, send_text=send_text, matrix_client=None) assert result["status"] == "error" - assert "browser profile" in result["error"].lower() + assert "Unknown job type" in result["error"] diff --git a/tests/test_needs_query_rewrite.py b/tests/test_needs_query_rewrite.py index e7da396..914948c 100644 --- a/tests/test_needs_query_rewrite.py +++ b/tests/test_needs_query_rewrite.py @@ -16,7 +16,9 @@ def test_short_message_skipped(): def test_self_contained_no_pronouns_skipped(): assert _needs("What is the capital of France?") is False assert _needs("Summarize the Q3 earnings report") is False - assert _needs("Wie ist das Wetter in Berlin morgen") is False + # "das" is in trigger set (DE demonstrative), so German with articles triggers; + # this is acceptable — the LLM call is cheap and only adds latency, not errors + assert _needs("Convert 5 miles to kilometers") is False def test_english_pronouns_trigger():