Cron package that syncs jobs from matrixhost portal API, schedules execution with timezone-aware timing, and posts results to Matrix rooms. Includes Brave Search, reminder, and browser scrape (placeholder) executors with formatter. 31 pytest tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
218 lines
7.3 KiB
Python
218 lines
7.3 KiB
Python
"""Tests for the cron scheduler module."""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
import zoneinfo
|
|
|
|
import pytest
|
|
|
|
from cron.scheduler import CronScheduler
|
|
|
|
|
|
@pytest.fixture
|
|
def scheduler():
|
|
send_text = AsyncMock()
|
|
matrix_client = MagicMock()
|
|
sched = CronScheduler(
|
|
portal_url="https://matrixhost.eu",
|
|
api_key="test-key",
|
|
matrix_client=matrix_client,
|
|
send_text_fn=send_text,
|
|
)
|
|
return sched
|
|
|
|
|
|
class TestSecondsUntilNextRun:
|
|
def test_daily_schedule_future_today(self, scheduler):
|
|
tz = zoneinfo.ZoneInfo("Europe/Berlin")
|
|
now = datetime.now(tz)
|
|
# Set scheduleAt to 2 hours from now
|
|
future_time = now + timedelta(hours=2)
|
|
job = {
|
|
"schedule": "daily",
|
|
"scheduleAt": f"{future_time.hour:02d}:{future_time.minute:02d}",
|
|
"timezone": "Europe/Berlin",
|
|
}
|
|
secs = scheduler._seconds_until_next_run(job)
|
|
assert 7000 < secs < 7300 # roughly 2 hours
|
|
|
|
def test_daily_schedule_past_today_goes_tomorrow(self, scheduler):
|
|
tz = zoneinfo.ZoneInfo("Europe/Berlin")
|
|
now = datetime.now(tz)
|
|
# Set scheduleAt to 2 hours ago
|
|
past_time = now - timedelta(hours=2)
|
|
job = {
|
|
"schedule": "daily",
|
|
"scheduleAt": f"{past_time.hour:02d}:{past_time.minute:02d}",
|
|
"timezone": "Europe/Berlin",
|
|
}
|
|
secs = scheduler._seconds_until_next_run(job)
|
|
# Should be ~22 hours from now
|
|
assert 78000 < secs < 80000
|
|
|
|
def test_hourly_schedule(self, scheduler):
|
|
job = {
|
|
"schedule": "hourly",
|
|
"scheduleAt": None,
|
|
"timezone": "Europe/Berlin",
|
|
}
|
|
secs = scheduler._seconds_until_next_run(job)
|
|
# Should be between 0 and 3600
|
|
assert 0 <= secs <= 3600
|
|
|
|
def test_weekdays_skips_weekend(self, scheduler):
|
|
# Mock a Saturday
|
|
job = {
|
|
"schedule": "weekdays",
|
|
"scheduleAt": "09:00",
|
|
"timezone": "UTC",
|
|
}
|
|
secs = scheduler._seconds_until_next_run(job)
|
|
assert secs > 0
|
|
|
|
def test_weekly_schedule(self, scheduler):
|
|
job = {
|
|
"schedule": "weekly",
|
|
"scheduleAt": "09:00",
|
|
"timezone": "Europe/Berlin",
|
|
}
|
|
secs = scheduler._seconds_until_next_run(job)
|
|
# Should be between 0 and 7 days
|
|
assert 0 < secs <= 7 * 86400
|
|
|
|
def test_default_timezone(self, scheduler):
|
|
job = {
|
|
"schedule": "daily",
|
|
"scheduleAt": "23:59",
|
|
"timezone": "Europe/Berlin",
|
|
}
|
|
secs = scheduler._seconds_until_next_run(job)
|
|
assert secs > 0
|
|
|
|
def test_different_timezone(self, scheduler):
|
|
job = {
|
|
"schedule": "daily",
|
|
"scheduleAt": "09:00",
|
|
"timezone": "America/New_York",
|
|
}
|
|
secs = scheduler._seconds_until_next_run(job)
|
|
assert secs > 0
|
|
|
|
|
|
class TestSyncJobs:
|
|
@pytest.mark.asyncio
|
|
async def test_sync_adds_new_jobs(self, scheduler):
|
|
"""New jobs from the API should be registered as tasks."""
|
|
jobs_response = {
|
|
"jobs": [
|
|
{
|
|
"id": "j1",
|
|
"name": "Test Job",
|
|
"jobType": "brave_search",
|
|
"schedule": "daily",
|
|
"scheduleAt": "09:00",
|
|
"timezone": "Europe/Berlin",
|
|
"config": {"query": "test"},
|
|
"targetRoom": "!room:test",
|
|
"enabled": True,
|
|
"updatedAt": "2026-03-16T00:00:00Z",
|
|
}
|
|
]
|
|
}
|
|
|
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
|
mock_client = AsyncMock()
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
mock_resp = MagicMock()
|
|
mock_resp.json.return_value = jobs_response
|
|
mock_resp.raise_for_status = MagicMock()
|
|
mock_client.get = AsyncMock(return_value=mock_resp)
|
|
|
|
await scheduler._sync_jobs()
|
|
|
|
assert "j1" in scheduler._jobs
|
|
assert "j1" in scheduler._tasks
|
|
# Clean up the task
|
|
scheduler._tasks["j1"].cancel()
|
|
try:
|
|
await scheduler._tasks["j1"]
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_removes_deleted_jobs(self, scheduler):
|
|
"""Jobs removed from the API should have their tasks cancelled."""
|
|
# Pre-populate with a job
|
|
mock_task = AsyncMock()
|
|
mock_task.cancel = MagicMock()
|
|
scheduler._jobs["old_job"] = {"id": "old_job"}
|
|
scheduler._tasks["old_job"] = mock_task
|
|
|
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
|
mock_client = AsyncMock()
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
mock_resp = MagicMock()
|
|
mock_resp.json.return_value = {"jobs": []} # No jobs
|
|
mock_resp.raise_for_status = MagicMock()
|
|
mock_client.get = AsyncMock(return_value=mock_resp)
|
|
|
|
await scheduler._sync_jobs()
|
|
|
|
assert "old_job" not in scheduler._tasks
|
|
assert "old_job" not in scheduler._jobs
|
|
mock_task.cancel.assert_called_once()
|
|
|
|
|
|
class TestRunOnce:
|
|
@pytest.mark.asyncio
|
|
async def test_run_once_reports_success(self, scheduler):
|
|
"""Successful execution should report back to portal."""
|
|
job = {
|
|
"id": "j1",
|
|
"name": "Test",
|
|
"jobType": "reminder",
|
|
"config": {"message": "Hello"},
|
|
"targetRoom": "!room:test",
|
|
}
|
|
|
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
|
mock_client = AsyncMock()
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
mock_client.post = AsyncMock()
|
|
|
|
await scheduler._run_once(job)
|
|
|
|
# send_text should have been called with the reminder
|
|
scheduler.send_text.assert_called_once()
|
|
call_args = scheduler.send_text.call_args
|
|
assert call_args[0][0] == "!room:test"
|
|
assert "Hello" in call_args[0][1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_once_reports_error(self, scheduler):
|
|
"""Failed execution should report error back to portal."""
|
|
job = {
|
|
"id": "j1",
|
|
"name": "Test",
|
|
"jobType": "brave_search",
|
|
"config": {}, # Missing query = error
|
|
"targetRoom": "!room:test",
|
|
"dedupKeys": [],
|
|
}
|
|
|
|
with patch("httpx.AsyncClient") as mock_client_cls:
|
|
mock_client = AsyncMock()
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
mock_client.post = AsyncMock()
|
|
|
|
await scheduler._run_once(job)
|
|
|
|
# Should not have sent a message to the room (error in executor)
|
|
# But should have reported back
|
|
# The report happens via httpx post
|