feat(MAT-174): Add cron job scheduler and executors

Cron package that syncs jobs from matrixhost portal API, schedules execution
with timezone-aware timing, and posts results to Matrix rooms. Includes
Brave Search, reminder, and browser scrape (placeholder) executors with
formatter. 31 pytest tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-03-16 09:31:19 +02:00
parent 21b8a4efb1
commit 4d8ea44b3d
15 changed files with 1009 additions and 0 deletions

3
cron/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .scheduler import CronScheduler
__all__ = ["CronScheduler"]

63
cron/brave_search.py Normal file
View File

@@ -0,0 +1,63 @@
"""Brave Search executor for cron jobs."""
import logging
import os
import httpx
from .formatter import format_search_results
logger = logging.getLogger(__name__)
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
async def execute_brave_search(job: dict, send_text, **_kwargs) -> dict:
"""Run a Brave Search query, dedup against known keys, post new results to Matrix."""
if not BRAVE_API_KEY:
return {"status": "error", "error": "BRAVE_API_KEY not configured"}
config = job.get("config", {})
query = config.get("query", "")
max_results = config.get("maxResults", 10)
target_room = job["targetRoom"]
dedup_keys = set(job.get("dedupKeys", []))
if not query:
return {"status": "error", "error": "No search query configured"}
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
"https://api.search.brave.com/res/v1/web/search",
headers={
"Accept": "application/json",
"X-Subscription-Token": BRAVE_API_KEY,
},
params={"q": query, "count": max_results, "text_decorations": False},
)
resp.raise_for_status()
data = resp.json()
results = data.get("web", {}).get("results", [])
if not results:
return {"status": "no_results"}
# Dedup by URL
new_results = [r for r in results if r.get("url") not in dedup_keys]
if not new_results:
return {"status": "no_results"}
msg = format_search_results(job["name"], new_results)
await send_text(target_room, msg)
new_keys = [r["url"] for r in new_results if r.get("url")]
return {
"status": "success",
"newDedupKeys": new_keys,
}
except Exception as exc:
logger.error("Brave search cron failed: %s", exc, exc_info=True)
return {"status": "error", "error": str(exc)}

43
cron/browser_executor.py Normal file
View File

@@ -0,0 +1,43 @@
"""Browser scrape executor for cron jobs (placeholder for workflow-use integration)."""
import logging
logger = logging.getLogger(__name__)
async def execute_browser_scrape(job: dict, send_text, **_kwargs) -> dict:
"""Execute a browser-based scraping job.
This is a placeholder that will be fully implemented when workflow-use
or Skyvern is deployed. For now, it posts a message indicating the
feature is pending setup.
"""
target_room = job["targetRoom"]
config = job.get("config", {})
url = config.get("url", "")
browser_profile = job.get("browserProfile")
if not browser_profile:
await send_text(
target_room,
f"**{job['name']}**: Browser scan requires a connected browser profile. "
f"Set one up at https://matrixhost.eu/settings/automations",
)
return {"status": "error", "error": "No browser profile configured"}
if browser_profile.get("status") == "expired":
await send_text(
target_room,
f"**{job['name']}**: Your browser session has expired. "
f"Please re-record at https://matrixhost.eu/settings/automations",
)
return {"status": "error", "error": "Browser session expired"}
# TODO: Integrate workflow-use or Skyvern for actual browser replay
# For now, fall back to a simple notification
await send_text(
target_room,
f"**{job['name']}**: Browser scan for {url} is configured but browser "
f"replay is not yet available. Web Search automations work now.",
)
return {"status": "error", "error": "Browser executor not yet implemented"}

28
cron/executor.py Normal file
View File

@@ -0,0 +1,28 @@
"""Dispatch cron jobs to the correct executor by job_type."""
import logging
from .brave_search import execute_brave_search
from .browser_executor import execute_browser_scrape
from .reminder import execute_reminder
logger = logging.getLogger(__name__)
EXECUTORS = {
"brave_search": execute_brave_search,
"browser_scrape": execute_browser_scrape,
"reminder": execute_reminder,
}
async def execute_job(job: dict, send_text, matrix_client) -> dict:
"""Execute a cron job and return a result dict for reporting."""
job_type = job["jobType"]
executor = EXECUTORS.get(job_type)
if not executor:
msg = f"Unknown job type: {job_type}"
logger.error(msg)
return {"status": "error", "error": msg}
return await executor(job=job, send_text=send_text, matrix_client=matrix_client)

55
cron/formatter.py Normal file
View File

@@ -0,0 +1,55 @@
"""Format cron job results as Matrix messages (markdown)."""
def format_search_results(job_name: str, results: list[dict]) -> str:
"""Format Brave Search results as a markdown message for Matrix."""
count = len(results)
lines = [f"**{job_name}** \u2014 {count} new result{'s' if count != 1 else ''}:\n"]
for i, r in enumerate(results, 1):
title = r.get("title", "Untitled")
url = r.get("url", "")
desc = r.get("description", "")
lines.append(f"{i}. **[{title}]({url})**")
if desc:
lines.append(f" {desc}")
lines.append("")
lines.append(
"_Manage automations: https://matrixhost.eu/settings/automations_"
)
return "\n".join(lines)
def format_listings(job_name: str, listings: list[dict]) -> str:
"""Format browser-scraped listings as a markdown message for Matrix."""
count = len(listings)
lines = [f"**{job_name}** \u2014 {count} new listing{'s' if count != 1 else ''}:\n"]
for i, item in enumerate(listings, 1):
title = item.get("title", "Unknown")
price = item.get("price", "")
location = item.get("location", "")
url = item.get("url", "")
age = item.get("age", "")
line = f"{i}. **{title}**"
if price:
line += f" \u2014 {price}"
lines.append(line)
details = []
if location:
details.append(f"\U0001f4cd {location}")
if age:
details.append(f"\U0001f4c5 {age}")
if url:
details.append(f"[View listing]({url})")
if details:
lines.append(f" {' \u00b7 '.join(details)}")
lines.append("")
lines.append(
"_Manage automations: https://matrixhost.eu/settings/automations_"
)
return "\n".join(lines)

20
cron/reminder.py Normal file
View File

@@ -0,0 +1,20 @@
"""Simple reminder executor for cron jobs."""
import logging
logger = logging.getLogger(__name__)
async def execute_reminder(job: dict, send_text, **_kwargs) -> dict:
"""Post a reminder message to a Matrix room."""
config = job.get("config", {})
message = config.get("message", "")
target_room = job["targetRoom"]
if not message:
return {"status": "error", "error": "No reminder message configured"}
text = f"\u23f0 **{job['name']}:** {message}"
await send_text(target_room, text)
return {"status": "success"}

182
cron/scheduler.py Normal file
View File

@@ -0,0 +1,182 @@
"""Cron job scheduler that syncs with matrixhost-web API and executes jobs."""
import asyncio
import logging
from datetime import datetime, timezone
import httpx
from .executor import execute_job
logger = logging.getLogger(__name__)
SYNC_INTERVAL = 300 # 5 minutes
class CronScheduler:
"""Fetches enabled cron jobs from the matrixhost portal and runs them on schedule."""
def __init__(self, portal_url: str, api_key: str, matrix_client, send_text_fn):
self.portal_url = portal_url.rstrip("/")
self.api_key = api_key
self.matrix_client = matrix_client
self.send_text = send_text_fn
self._jobs: dict[str, dict] = {} # id -> job data
self._tasks: dict[str, asyncio.Task] = {} # id -> scheduler task
self._running = False
async def start(self):
"""Start the scheduler background loop."""
self._running = True
logger.info("Cron scheduler starting")
await asyncio.sleep(15) # wait for bot to stabilize
while self._running:
try:
await self._sync_jobs()
except Exception:
logger.warning("Cron job sync failed", exc_info=True)
await asyncio.sleep(SYNC_INTERVAL)
async def stop(self):
self._running = False
for task in self._tasks.values():
task.cancel()
self._tasks.clear()
async def _sync_jobs(self):
"""Fetch active jobs from portal and reconcile with running tasks."""
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
f"{self.portal_url}/api/cron/jobs/active",
headers={"x-api-key": self.api_key},
)
resp.raise_for_status()
data = resp.json()
remote_jobs = {j["id"]: j for j in data.get("jobs", [])}
# Remove jobs that are no longer active
for job_id in list(self._tasks):
if job_id not in remote_jobs:
logger.info("Removing cron job %s (no longer active)", job_id)
self._tasks[job_id].cancel()
del self._tasks[job_id]
self._jobs.pop(job_id, None)
# Add/update jobs
for job_id, job in remote_jobs.items():
existing = self._jobs.get(job_id)
if existing and existing.get("updatedAt") == job.get("updatedAt"):
continue # unchanged
# Cancel old task if updating
if job_id in self._tasks:
self._tasks[job_id].cancel()
self._jobs[job_id] = job
self._tasks[job_id] = asyncio.create_task(
self._job_loop(job), name=f"cron-{job_id}"
)
logger.info("Scheduled cron job: %s (%s @ %s %s)",
job["name"], job["schedule"], job.get("scheduleAt", ""), job["timezone"])
# Check for manual triggers (lastStatus == "pending")
for job_id, job in remote_jobs.items():
if job.get("lastStatus") == "pending":
logger.info("Manual trigger detected for %s", job["name"])
asyncio.create_task(self._run_once(job))
async def _job_loop(self, job: dict):
"""Run a job on its schedule forever."""
try:
while True:
sleep_secs = self._seconds_until_next_run(job)
if sleep_secs > 0:
await asyncio.sleep(sleep_secs)
await self._run_once(job)
except asyncio.CancelledError:
pass
async def _run_once(self, job: dict):
"""Execute a single job run and report results back."""
job_id = job["id"]
logger.info("Running cron job: %s (%s)", job["name"], job["jobType"])
try:
result = await execute_job(
job=job,
send_text=self.send_text,
matrix_client=self.matrix_client,
)
await self._report_result(job_id, result)
except Exception as exc:
logger.error("Cron job %s failed: %s", job["name"], exc, exc_info=True)
await self._report_result(job_id, {
"status": "error",
"error": str(exc),
})
async def _report_result(self, job_id: str, result: dict):
"""Report job execution result back to the portal."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
f"{self.portal_url}/api/cron/jobs/{job_id}/result",
headers={"x-api-key": self.api_key},
json=result,
)
except Exception:
logger.warning("Failed to report cron result for %s", job_id, exc_info=True)
def _seconds_until_next_run(self, job: dict) -> float:
"""Calculate seconds until the next scheduled run."""
import zoneinfo
schedule = job["schedule"]
schedule_at = job.get("scheduleAt", "09:00") or "09:00"
tz = zoneinfo.ZoneInfo(job.get("timezone", "Europe/Berlin"))
now = datetime.now(tz)
hour, minute = (int(x) for x in schedule_at.split(":"))
if schedule == "hourly":
# Run at the top of every hour
next_run = now.replace(minute=0, second=0, microsecond=0)
if next_run <= now:
next_run = next_run.replace(hour=now.hour + 1)
return (next_run - now).total_seconds()
if schedule == "daily":
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_run <= now:
from datetime import timedelta
next_run += timedelta(days=1)
return (next_run - now).total_seconds()
if schedule == "weekly":
# Monday = 0
from datetime import timedelta
days_ahead = (0 - now.weekday()) % 7 or 7
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if now.weekday() == 0 and next_run > now:
days_ahead = 0
next_run += timedelta(days=days_ahead)
if next_run <= now:
next_run += timedelta(days=7)
return (next_run - now).total_seconds()
if schedule == "weekdays":
from datetime import timedelta
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
# Skip weekends
while next_run.weekday() >= 5:
next_run += timedelta(days=1)
return (next_run - now).total_seconds()
# Default: daily
from datetime import timedelta
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_run <= now:
next_run += timedelta(days=1)
return (next_run - now).total_seconds()