diff --git a/bot.py b/bot.py index 353bb0d..bbbb79a 100644 --- a/bot.py +++ b/bot.py @@ -43,6 +43,7 @@ from nio.crypto.attachments import decrypt_attachment from livekit import api, rtc from voice import VoiceSession from article_summary import ArticleSummaryHandler +from cron import CronScheduler BOT_DEVICE_ID = "AIBOT" CALL_MEMBER_TYPE = "org.matrix.msc3401.call.member" @@ -1172,6 +1173,16 @@ class Bot: ) else: self.article_handler = None + # Cron job scheduler (syncs with matrixhost portal) + if PORTAL_URL and BOT_API_KEY: + self.cron_scheduler = CronScheduler( + portal_url=PORTAL_URL, + api_key=BOT_API_KEY, + matrix_client=self.client, + send_text_fn=self._send_text, + ) + else: + self.cron_scheduler = None async def _has_documents(self, matrix_user_id: str) -> bool: """Check if user has documents via local RAG or MatrixHost portal API. @@ -1259,6 +1270,11 @@ class Bot: # Start reminder scheduler asyncio.create_task(self._reminder_scheduler()) + # Start cron job scheduler + if self.cron_scheduler: + asyncio.create_task(self.cron_scheduler.start()) + logger.info("Cron scheduler task created") + await self.client.sync_forever(timeout=30000, full_state=True) async def _ensure_cross_signing(self): @@ -3508,6 +3524,8 @@ class Bot: await self.client.to_device(mac) async def cleanup(self): + if self.cron_scheduler: + await self.cron_scheduler.stop() await self.client.close() if self.lkapi: await self.lkapi.aclose() diff --git a/cron/__init__.py b/cron/__init__.py new file mode 100644 index 0000000..3ec1ac2 --- /dev/null +++ b/cron/__init__.py @@ -0,0 +1,3 @@ +from .scheduler import CronScheduler + +__all__ = ["CronScheduler"] diff --git a/cron/brave_search.py b/cron/brave_search.py new file mode 100644 index 0000000..f8ba0f3 --- /dev/null +++ b/cron/brave_search.py @@ -0,0 +1,63 @@ +"""Brave Search executor for cron jobs.""" + +import logging +import os + +import httpx + +from .formatter import format_search_results + +logger = logging.getLogger(__name__) + +BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "") + + +async def execute_brave_search(job: dict, send_text, **_kwargs) -> dict: + """Run a Brave Search query, dedup against known keys, post new results to Matrix.""" + if not BRAVE_API_KEY: + return {"status": "error", "error": "BRAVE_API_KEY not configured"} + + config = job.get("config", {}) + query = config.get("query", "") + max_results = config.get("maxResults", 10) + target_room = job["targetRoom"] + dedup_keys = set(job.get("dedupKeys", [])) + + if not query: + return {"status": "error", "error": "No search query configured"} + + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + "https://api.search.brave.com/res/v1/web/search", + headers={ + "Accept": "application/json", + "X-Subscription-Token": BRAVE_API_KEY, + }, + params={"q": query, "count": max_results, "text_decorations": False}, + ) + resp.raise_for_status() + data = resp.json() + + results = data.get("web", {}).get("results", []) + if not results: + return {"status": "no_results"} + + # Dedup by URL + new_results = [r for r in results if r.get("url") not in dedup_keys] + + if not new_results: + return {"status": "no_results"} + + msg = format_search_results(job["name"], new_results) + await send_text(target_room, msg) + + new_keys = [r["url"] for r in new_results if r.get("url")] + return { + "status": "success", + "newDedupKeys": new_keys, + } + + except Exception as exc: + logger.error("Brave search cron failed: %s", exc, exc_info=True) + return {"status": "error", "error": str(exc)} diff --git a/cron/browser_executor.py b/cron/browser_executor.py new file mode 100644 index 0000000..5dddf49 --- /dev/null +++ b/cron/browser_executor.py @@ -0,0 +1,43 @@ +"""Browser scrape executor for cron jobs (placeholder for workflow-use integration).""" + +import logging + +logger = logging.getLogger(__name__) + + +async def execute_browser_scrape(job: dict, send_text, **_kwargs) -> dict: + """Execute a browser-based scraping job. + + This is a placeholder that will be fully implemented when workflow-use + or Skyvern is deployed. For now, it posts a message indicating the + feature is pending setup. + """ + target_room = job["targetRoom"] + config = job.get("config", {}) + url = config.get("url", "") + browser_profile = job.get("browserProfile") + + if not browser_profile: + await send_text( + target_room, + f"**{job['name']}**: Browser scan requires a connected browser profile. " + f"Set one up at https://matrixhost.eu/settings/automations", + ) + return {"status": "error", "error": "No browser profile configured"} + + if browser_profile.get("status") == "expired": + await send_text( + target_room, + f"**{job['name']}**: Your browser session has expired. " + f"Please re-record at https://matrixhost.eu/settings/automations", + ) + return {"status": "error", "error": "Browser session expired"} + + # TODO: Integrate workflow-use or Skyvern for actual browser replay + # For now, fall back to a simple notification + await send_text( + target_room, + f"**{job['name']}**: Browser scan for {url} is configured but browser " + f"replay is not yet available. Web Search automations work now.", + ) + return {"status": "error", "error": "Browser executor not yet implemented"} diff --git a/cron/executor.py b/cron/executor.py new file mode 100644 index 0000000..9edba97 --- /dev/null +++ b/cron/executor.py @@ -0,0 +1,28 @@ +"""Dispatch cron jobs to the correct executor by job_type.""" + +import logging + +from .brave_search import execute_brave_search +from .browser_executor import execute_browser_scrape +from .reminder import execute_reminder + +logger = logging.getLogger(__name__) + +EXECUTORS = { + "brave_search": execute_brave_search, + "browser_scrape": execute_browser_scrape, + "reminder": execute_reminder, +} + + +async def execute_job(job: dict, send_text, matrix_client) -> dict: + """Execute a cron job and return a result dict for reporting.""" + job_type = job["jobType"] + executor = EXECUTORS.get(job_type) + + if not executor: + msg = f"Unknown job type: {job_type}" + logger.error(msg) + return {"status": "error", "error": msg} + + return await executor(job=job, send_text=send_text, matrix_client=matrix_client) diff --git a/cron/formatter.py b/cron/formatter.py new file mode 100644 index 0000000..23dd295 --- /dev/null +++ b/cron/formatter.py @@ -0,0 +1,55 @@ +"""Format cron job results as Matrix messages (markdown).""" + + +def format_search_results(job_name: str, results: list[dict]) -> str: + """Format Brave Search results as a markdown message for Matrix.""" + count = len(results) + lines = [f"**{job_name}** \u2014 {count} new result{'s' if count != 1 else ''}:\n"] + + for i, r in enumerate(results, 1): + title = r.get("title", "Untitled") + url = r.get("url", "") + desc = r.get("description", "") + lines.append(f"{i}. **[{title}]({url})**") + if desc: + lines.append(f" {desc}") + lines.append("") + + lines.append( + "_Manage automations: https://matrixhost.eu/settings/automations_" + ) + return "\n".join(lines) + + +def format_listings(job_name: str, listings: list[dict]) -> str: + """Format browser-scraped listings as a markdown message for Matrix.""" + count = len(listings) + lines = [f"**{job_name}** \u2014 {count} new listing{'s' if count != 1 else ''}:\n"] + + for i, item in enumerate(listings, 1): + title = item.get("title", "Unknown") + price = item.get("price", "") + location = item.get("location", "") + url = item.get("url", "") + age = item.get("age", "") + + line = f"{i}. **{title}**" + if price: + line += f" \u2014 {price}" + lines.append(line) + + details = [] + if location: + details.append(f"\U0001f4cd {location}") + if age: + details.append(f"\U0001f4c5 {age}") + if url: + details.append(f"[View listing]({url})") + if details: + lines.append(f" {' \u00b7 '.join(details)}") + lines.append("") + + lines.append( + "_Manage automations: https://matrixhost.eu/settings/automations_" + ) + return "\n".join(lines) diff --git a/cron/reminder.py b/cron/reminder.py new file mode 100644 index 0000000..ccd067d --- /dev/null +++ b/cron/reminder.py @@ -0,0 +1,20 @@ +"""Simple reminder executor for cron jobs.""" + +import logging + +logger = logging.getLogger(__name__) + + +async def execute_reminder(job: dict, send_text, **_kwargs) -> dict: + """Post a reminder message to a Matrix room.""" + config = job.get("config", {}) + message = config.get("message", "") + target_room = job["targetRoom"] + + if not message: + return {"status": "error", "error": "No reminder message configured"} + + text = f"\u23f0 **{job['name']}:** {message}" + await send_text(target_room, text) + + return {"status": "success"} diff --git a/cron/scheduler.py b/cron/scheduler.py new file mode 100644 index 0000000..cb4d84c --- /dev/null +++ b/cron/scheduler.py @@ -0,0 +1,182 @@ +"""Cron job scheduler that syncs with matrixhost-web API and executes jobs.""" + +import asyncio +import logging +from datetime import datetime, timezone + +import httpx + +from .executor import execute_job + +logger = logging.getLogger(__name__) + +SYNC_INTERVAL = 300 # 5 minutes + + +class CronScheduler: + """Fetches enabled cron jobs from the matrixhost portal and runs them on schedule.""" + + def __init__(self, portal_url: str, api_key: str, matrix_client, send_text_fn): + self.portal_url = portal_url.rstrip("/") + self.api_key = api_key + self.matrix_client = matrix_client + self.send_text = send_text_fn + self._jobs: dict[str, dict] = {} # id -> job data + self._tasks: dict[str, asyncio.Task] = {} # id -> scheduler task + self._running = False + + async def start(self): + """Start the scheduler background loop.""" + self._running = True + logger.info("Cron scheduler starting") + await asyncio.sleep(15) # wait for bot to stabilize + while self._running: + try: + await self._sync_jobs() + except Exception: + logger.warning("Cron job sync failed", exc_info=True) + await asyncio.sleep(SYNC_INTERVAL) + + async def stop(self): + self._running = False + for task in self._tasks.values(): + task.cancel() + self._tasks.clear() + + async def _sync_jobs(self): + """Fetch active jobs from portal and reconcile with running tasks.""" + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + f"{self.portal_url}/api/cron/jobs/active", + headers={"x-api-key": self.api_key}, + ) + resp.raise_for_status() + data = resp.json() + + remote_jobs = {j["id"]: j for j in data.get("jobs", [])} + + # Remove jobs that are no longer active + for job_id in list(self._tasks): + if job_id not in remote_jobs: + logger.info("Removing cron job %s (no longer active)", job_id) + self._tasks[job_id].cancel() + del self._tasks[job_id] + self._jobs.pop(job_id, None) + + # Add/update jobs + for job_id, job in remote_jobs.items(): + existing = self._jobs.get(job_id) + if existing and existing.get("updatedAt") == job.get("updatedAt"): + continue # unchanged + + # Cancel old task if updating + if job_id in self._tasks: + self._tasks[job_id].cancel() + + self._jobs[job_id] = job + self._tasks[job_id] = asyncio.create_task( + self._job_loop(job), name=f"cron-{job_id}" + ) + logger.info("Scheduled cron job: %s (%s @ %s %s)", + job["name"], job["schedule"], job.get("scheduleAt", ""), job["timezone"]) + + # Check for manual triggers (lastStatus == "pending") + for job_id, job in remote_jobs.items(): + if job.get("lastStatus") == "pending": + logger.info("Manual trigger detected for %s", job["name"]) + asyncio.create_task(self._run_once(job)) + + async def _job_loop(self, job: dict): + """Run a job on its schedule forever.""" + try: + while True: + sleep_secs = self._seconds_until_next_run(job) + if sleep_secs > 0: + await asyncio.sleep(sleep_secs) + await self._run_once(job) + except asyncio.CancelledError: + pass + + async def _run_once(self, job: dict): + """Execute a single job run and report results back.""" + job_id = job["id"] + logger.info("Running cron job: %s (%s)", job["name"], job["jobType"]) + try: + result = await execute_job( + job=job, + send_text=self.send_text, + matrix_client=self.matrix_client, + ) + await self._report_result(job_id, result) + except Exception as exc: + logger.error("Cron job %s failed: %s", job["name"], exc, exc_info=True) + await self._report_result(job_id, { + "status": "error", + "error": str(exc), + }) + + async def _report_result(self, job_id: str, result: dict): + """Report job execution result back to the portal.""" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + await client.post( + f"{self.portal_url}/api/cron/jobs/{job_id}/result", + headers={"x-api-key": self.api_key}, + json=result, + ) + except Exception: + logger.warning("Failed to report cron result for %s", job_id, exc_info=True) + + def _seconds_until_next_run(self, job: dict) -> float: + """Calculate seconds until the next scheduled run.""" + import zoneinfo + + schedule = job["schedule"] + schedule_at = job.get("scheduleAt", "09:00") or "09:00" + tz = zoneinfo.ZoneInfo(job.get("timezone", "Europe/Berlin")) + now = datetime.now(tz) + + hour, minute = (int(x) for x in schedule_at.split(":")) + + if schedule == "hourly": + # Run at the top of every hour + next_run = now.replace(minute=0, second=0, microsecond=0) + if next_run <= now: + next_run = next_run.replace(hour=now.hour + 1) + return (next_run - now).total_seconds() + + if schedule == "daily": + next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_run <= now: + from datetime import timedelta + next_run += timedelta(days=1) + return (next_run - now).total_seconds() + + if schedule == "weekly": + # Monday = 0 + from datetime import timedelta + days_ahead = (0 - now.weekday()) % 7 or 7 + next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if now.weekday() == 0 and next_run > now: + days_ahead = 0 + next_run += timedelta(days=days_ahead) + if next_run <= now: + next_run += timedelta(days=7) + return (next_run - now).total_seconds() + + if schedule == "weekdays": + from datetime import timedelta + next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_run <= now: + next_run += timedelta(days=1) + # Skip weekends + while next_run.weekday() >= 5: + next_run += timedelta(days=1) + return (next_run - now).total_seconds() + + # Default: daily + from datetime import timedelta + next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_run <= now: + next_run += timedelta(days=1) + return (next_run - now).total_seconds() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_cron_brave_search.py b/tests/test_cron_brave_search.py new file mode 100644 index 0000000..a9d8d5c --- /dev/null +++ b/tests/test_cron_brave_search.py @@ -0,0 +1,150 @@ +"""Tests for the Brave Search cron executor.""" + +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from cron.brave_search import execute_brave_search + + +@pytest.fixture +def job(): + return { + "id": "j1", + "name": "BMW Search", + "jobType": "brave_search", + "config": {"query": "BMW X3 damaged Cyprus", "maxResults": 5}, + "targetRoom": "!room:cars", + "dedupKeys": ["https://old-result.com"], + } + + +class TestBraveSearchExecutor: + @pytest.mark.asyncio + async def test_returns_error_without_api_key(self, job): + with patch.dict(os.environ, {"BRAVE_API_KEY": ""}, clear=False): + # Need to reload module to pick up empty env + import importlib + import cron.brave_search as bs + original_key = bs.BRAVE_API_KEY + bs.BRAVE_API_KEY = "" + try: + result = await execute_brave_search(job=job, send_text=AsyncMock()) + assert result["status"] == "error" + assert "BRAVE_API_KEY" in result["error"] + finally: + bs.BRAVE_API_KEY = original_key + + @pytest.mark.asyncio + async def test_returns_error_without_query(self): + job = { + "id": "j1", + "name": "Empty", + "jobType": "brave_search", + "config": {}, + "targetRoom": "!room:test", + "dedupKeys": [], + } + import cron.brave_search as bs + original_key = bs.BRAVE_API_KEY + bs.BRAVE_API_KEY = "test-key" + try: + result = await execute_brave_search(job=job, send_text=AsyncMock()) + assert result["status"] == "error" + assert "query" in result["error"].lower() + finally: + bs.BRAVE_API_KEY = original_key + + @pytest.mark.asyncio + async def test_deduplicates_results(self, job): + """Results with URLs already in dedupKeys should be filtered out.""" + import cron.brave_search as bs + original_key = bs.BRAVE_API_KEY + bs.BRAVE_API_KEY = "test-key" + + mock_response = MagicMock() + mock_response.json.return_value = { + "web": { + "results": [ + {"title": "Old Result", "url": "https://old-result.com", "description": "Already seen"}, + {"title": "New BMW", "url": "https://new-result.com", "description": "Fresh listing"}, + ] + } + } + mock_response.raise_for_status = MagicMock() + + send_text = AsyncMock() + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + mock_client.get = AsyncMock(return_value=mock_response) + + try: + result = await execute_brave_search(job=job, send_text=send_text) + finally: + bs.BRAVE_API_KEY = original_key + + assert result["status"] == "success" + assert result["newDedupKeys"] == ["https://new-result.com"] + send_text.assert_called_once() + # Message should contain only the new result + msg = send_text.call_args[0][1] + assert "New BMW" in msg + assert "Old Result" not in msg + + @pytest.mark.asyncio + async def test_no_results_status(self, job): + """When API returns empty results, status should be no_results.""" + import cron.brave_search as bs + original_key = bs.BRAVE_API_KEY + bs.BRAVE_API_KEY = "test-key" + + mock_response = MagicMock() + mock_response.json.return_value = {"web": {"results": []}} + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + mock_client.get = AsyncMock(return_value=mock_response) + + try: + result = await execute_brave_search(job=job, send_text=AsyncMock()) + finally: + bs.BRAVE_API_KEY = original_key + + assert result["status"] == "no_results" + + @pytest.mark.asyncio + async def test_all_results_already_seen(self, job): + """When all results are already in dedupKeys, status should be no_results.""" + import cron.brave_search as bs + original_key = bs.BRAVE_API_KEY + bs.BRAVE_API_KEY = "test-key" + + mock_response = MagicMock() + mock_response.json.return_value = { + "web": { + "results": [ + {"title": "Old", "url": "https://old-result.com", "description": "Seen"}, + ] + } + } + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + mock_client.get = AsyncMock(return_value=mock_response) + + try: + result = await execute_brave_search(job=job, send_text=AsyncMock()) + finally: + bs.BRAVE_API_KEY = original_key + + assert result["status"] == "no_results" diff --git a/tests/test_cron_browser_executor.py b/tests/test_cron_browser_executor.py new file mode 100644 index 0000000..a67a495 --- /dev/null +++ b/tests/test_cron_browser_executor.py @@ -0,0 +1,58 @@ +"""Tests for the browser scrape executor.""" + +from unittest.mock import AsyncMock + +import pytest + +from cron.browser_executor import execute_browser_scrape + + +class TestBrowserScrapeExecutor: + @pytest.mark.asyncio + async def test_returns_error_without_profile(self): + job = { + "id": "j1", + "name": "FB Scan", + "config": {"url": "https://facebook.com/marketplace"}, + "targetRoom": "!room:test", + "browserProfile": None, + } + send_text = AsyncMock() + result = await execute_browser_scrape(job=job, send_text=send_text) + assert result["status"] == "error" + assert "browser profile" in result["error"].lower() + send_text.assert_called_once() + msg = send_text.call_args[0][1] + assert "matrixhost.eu/settings/automations" in msg + + @pytest.mark.asyncio + async def test_returns_error_with_expired_profile(self): + job = { + "id": "j1", + "name": "FB Scan", + "config": {"url": "https://facebook.com/marketplace"}, + "targetRoom": "!room:test", + "browserProfile": {"id": "b1", "status": "expired", "name": "facebook"}, + } + send_text = AsyncMock() + result = await execute_browser_scrape(job=job, send_text=send_text) + assert result["status"] == "error" + assert "expired" in result["error"].lower() + send_text.assert_called_once() + msg = send_text.call_args[0][1] + assert "re-record" in msg.lower() + + @pytest.mark.asyncio + async def test_placeholder_with_active_profile(self): + job = { + "id": "j1", + "name": "FB Scan", + "config": {"url": "https://facebook.com/marketplace"}, + "targetRoom": "!room:test", + "browserProfile": {"id": "b1", "status": "active", "name": "facebook"}, + } + send_text = AsyncMock() + result = await execute_browser_scrape(job=job, send_text=send_text) + # Currently a placeholder, should indicate not yet implemented + assert result["status"] == "error" + assert "not yet implemented" in result["error"].lower() diff --git a/tests/test_cron_executor.py b/tests/test_cron_executor.py new file mode 100644 index 0000000..938332a --- /dev/null +++ b/tests/test_cron_executor.py @@ -0,0 +1,48 @@ +"""Tests for the cron executor dispatch.""" + +from unittest.mock import AsyncMock + +import pytest + +from cron.executor import execute_job + + +class TestExecuteJob: + @pytest.mark.asyncio + async def test_unknown_job_type_returns_error(self): + job = {"jobType": "nonexistent", "config": {}} + result = await execute_job( + job=job, send_text=AsyncMock(), matrix_client=None + ) + assert result["status"] == "error" + assert "Unknown job type" in result["error"] + + @pytest.mark.asyncio + async def test_dispatches_to_reminder(self): + job = { + "id": "j1", + "name": "Test Reminder", + "jobType": "reminder", + "config": {"message": "Don't forget!"}, + "targetRoom": "!room:test", + } + send_text = AsyncMock() + result = await execute_job(job=job, send_text=send_text, matrix_client=None) + assert result["status"] == "success" + send_text.assert_called_once() + assert "Don't forget!" in send_text.call_args[0][1] + + @pytest.mark.asyncio + async def test_dispatches_to_browser_scrape_no_profile(self): + job = { + "id": "j1", + "name": "Scrape Test", + "jobType": "browser_scrape", + "config": {"url": "https://example.com"}, + "targetRoom": "!room:test", + "browserProfile": None, + } + send_text = AsyncMock() + result = await execute_job(job=job, send_text=send_text, matrix_client=None) + assert result["status"] == "error" + assert "browser profile" in result["error"].lower() diff --git a/tests/test_cron_formatter.py b/tests/test_cron_formatter.py new file mode 100644 index 0000000..c5f564c --- /dev/null +++ b/tests/test_cron_formatter.py @@ -0,0 +1,67 @@ +"""Tests for the cron result formatter.""" + +from cron.formatter import format_search_results, format_listings + + +class TestFormatSearchResults: + def test_single_result(self): + results = [ + {"title": "BMW X3 2018", "url": "https://example.com/1", "description": "Unfallwagen"} + ] + msg = format_search_results("BMW Scan", results) + assert "BMW Scan" in msg + assert "1 new result" in msg + assert "BMW X3 2018" in msg + assert "https://example.com/1" in msg + assert "Unfallwagen" in msg + assert "matrixhost.eu/settings/automations" in msg + + def test_multiple_results(self): + results = [ + {"title": "Result 1", "url": "https://a.com", "description": "Desc 1"}, + {"title": "Result 2", "url": "https://b.com", "description": "Desc 2"}, + {"title": "Result 3", "url": "https://c.com", "description": ""}, + ] + msg = format_search_results("Test Search", results) + assert "3 new results" in msg + assert "1." in msg + assert "2." in msg + assert "3." in msg + + def test_result_without_description(self): + results = [{"title": "No Desc", "url": "https://x.com"}] + msg = format_search_results("Search", results) + assert "No Desc" in msg + # Should not have empty description line + + +class TestFormatListings: + def test_single_listing(self): + listings = [ + { + "title": "BMW X3 2.0i", + "price": "\u20ac4,500", + "location": "Limassol", + "url": "https://fb.com/123", + "age": "2h ago", + } + ] + msg = format_listings("Car Scan", listings) + assert "Car Scan" in msg + assert "1 new listing" in msg + assert "BMW X3 2.0i" in msg + assert "\u20ac4,500" in msg + assert "Limassol" in msg + assert "2h ago" in msg + assert "https://fb.com/123" in msg + + def test_listing_without_optional_fields(self): + listings = [{"title": "Bare Listing"}] + msg = format_listings("Scan", listings) + assert "Bare Listing" in msg + assert "1 new listing" in msg + + def test_multiple_listings_plural(self): + listings = [{"title": f"Item {i}"} for i in range(5)] + msg = format_listings("Multi", listings) + assert "5 new listings" in msg diff --git a/tests/test_cron_reminder.py b/tests/test_cron_reminder.py new file mode 100644 index 0000000..a329646 --- /dev/null +++ b/tests/test_cron_reminder.py @@ -0,0 +1,57 @@ +"""Tests for the reminder cron executor.""" + +from unittest.mock import AsyncMock + +import pytest + +from cron.reminder import execute_reminder + + +class TestReminderExecutor: + @pytest.mark.asyncio + async def test_sends_reminder_to_room(self): + job = { + "id": "j1", + "name": "Daily Check", + "config": {"message": "Check your portfolio"}, + "targetRoom": "!room:finance", + } + send_text = AsyncMock() + result = await execute_reminder(job=job, send_text=send_text) + + assert result["status"] == "success" + send_text.assert_called_once() + room_id, msg = send_text.call_args[0] + assert room_id == "!room:finance" + assert "Check your portfolio" in msg + assert "Daily Check" in msg + assert "\u23f0" in msg # alarm clock emoji + + @pytest.mark.asyncio + async def test_returns_error_without_message(self): + job = { + "id": "j1", + "name": "Empty", + "config": {}, + "targetRoom": "!room:test", + } + send_text = AsyncMock() + result = await execute_reminder(job=job, send_text=send_text) + + assert result["status"] == "error" + assert "message" in result["error"].lower() + send_text.assert_not_called() + + @pytest.mark.asyncio + async def test_empty_message_returns_error(self): + job = { + "id": "j1", + "name": "Empty", + "config": {"message": ""}, + "targetRoom": "!room:test", + } + send_text = AsyncMock() + result = await execute_reminder(job=job, send_text=send_text) + + assert result["status"] == "error" + send_text.assert_not_called() diff --git a/tests/test_cron_scheduler.py b/tests/test_cron_scheduler.py new file mode 100644 index 0000000..26f8cf6 --- /dev/null +++ b/tests/test_cron_scheduler.py @@ -0,0 +1,217 @@ +"""Tests for the cron scheduler module.""" + +import asyncio +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch +import zoneinfo + +import pytest + +from cron.scheduler import CronScheduler + + +@pytest.fixture +def scheduler(): + send_text = AsyncMock() + matrix_client = MagicMock() + sched = CronScheduler( + portal_url="https://matrixhost.eu", + api_key="test-key", + matrix_client=matrix_client, + send_text_fn=send_text, + ) + return sched + + +class TestSecondsUntilNextRun: + def test_daily_schedule_future_today(self, scheduler): + tz = zoneinfo.ZoneInfo("Europe/Berlin") + now = datetime.now(tz) + # Set scheduleAt to 2 hours from now + future_time = now + timedelta(hours=2) + job = { + "schedule": "daily", + "scheduleAt": f"{future_time.hour:02d}:{future_time.minute:02d}", + "timezone": "Europe/Berlin", + } + secs = scheduler._seconds_until_next_run(job) + assert 7000 < secs < 7300 # roughly 2 hours + + def test_daily_schedule_past_today_goes_tomorrow(self, scheduler): + tz = zoneinfo.ZoneInfo("Europe/Berlin") + now = datetime.now(tz) + # Set scheduleAt to 2 hours ago + past_time = now - timedelta(hours=2) + job = { + "schedule": "daily", + "scheduleAt": f"{past_time.hour:02d}:{past_time.minute:02d}", + "timezone": "Europe/Berlin", + } + secs = scheduler._seconds_until_next_run(job) + # Should be ~22 hours from now + assert 78000 < secs < 80000 + + def test_hourly_schedule(self, scheduler): + job = { + "schedule": "hourly", + "scheduleAt": None, + "timezone": "Europe/Berlin", + } + secs = scheduler._seconds_until_next_run(job) + # Should be between 0 and 3600 + assert 0 <= secs <= 3600 + + def test_weekdays_skips_weekend(self, scheduler): + # Mock a Saturday + job = { + "schedule": "weekdays", + "scheduleAt": "09:00", + "timezone": "UTC", + } + secs = scheduler._seconds_until_next_run(job) + assert secs > 0 + + def test_weekly_schedule(self, scheduler): + job = { + "schedule": "weekly", + "scheduleAt": "09:00", + "timezone": "Europe/Berlin", + } + secs = scheduler._seconds_until_next_run(job) + # Should be between 0 and 7 days + assert 0 < secs <= 7 * 86400 + + def test_default_timezone(self, scheduler): + job = { + "schedule": "daily", + "scheduleAt": "23:59", + "timezone": "Europe/Berlin", + } + secs = scheduler._seconds_until_next_run(job) + assert secs > 0 + + def test_different_timezone(self, scheduler): + job = { + "schedule": "daily", + "scheduleAt": "09:00", + "timezone": "America/New_York", + } + secs = scheduler._seconds_until_next_run(job) + assert secs > 0 + + +class TestSyncJobs: + @pytest.mark.asyncio + async def test_sync_adds_new_jobs(self, scheduler): + """New jobs from the API should be registered as tasks.""" + jobs_response = { + "jobs": [ + { + "id": "j1", + "name": "Test Job", + "jobType": "brave_search", + "schedule": "daily", + "scheduleAt": "09:00", + "timezone": "Europe/Berlin", + "config": {"query": "test"}, + "targetRoom": "!room:test", + "enabled": True, + "updatedAt": "2026-03-16T00:00:00Z", + } + ] + } + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + mock_resp = MagicMock() + mock_resp.json.return_value = jobs_response + mock_resp.raise_for_status = MagicMock() + mock_client.get = AsyncMock(return_value=mock_resp) + + await scheduler._sync_jobs() + + assert "j1" in scheduler._jobs + assert "j1" in scheduler._tasks + # Clean up the task + scheduler._tasks["j1"].cancel() + try: + await scheduler._tasks["j1"] + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_sync_removes_deleted_jobs(self, scheduler): + """Jobs removed from the API should have their tasks cancelled.""" + # Pre-populate with a job + mock_task = AsyncMock() + mock_task.cancel = MagicMock() + scheduler._jobs["old_job"] = {"id": "old_job"} + scheduler._tasks["old_job"] = mock_task + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + mock_resp = MagicMock() + mock_resp.json.return_value = {"jobs": []} # No jobs + mock_resp.raise_for_status = MagicMock() + mock_client.get = AsyncMock(return_value=mock_resp) + + await scheduler._sync_jobs() + + assert "old_job" not in scheduler._tasks + assert "old_job" not in scheduler._jobs + mock_task.cancel.assert_called_once() + + +class TestRunOnce: + @pytest.mark.asyncio + async def test_run_once_reports_success(self, scheduler): + """Successful execution should report back to portal.""" + job = { + "id": "j1", + "name": "Test", + "jobType": "reminder", + "config": {"message": "Hello"}, + "targetRoom": "!room:test", + } + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + mock_client.post = AsyncMock() + + await scheduler._run_once(job) + + # send_text should have been called with the reminder + scheduler.send_text.assert_called_once() + call_args = scheduler.send_text.call_args + assert call_args[0][0] == "!room:test" + assert "Hello" in call_args[0][1] + + @pytest.mark.asyncio + async def test_run_once_reports_error(self, scheduler): + """Failed execution should report error back to portal.""" + job = { + "id": "j1", + "name": "Test", + "jobType": "brave_search", + "config": {}, # Missing query = error + "targetRoom": "!room:test", + "dedupKeys": [], + } + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + mock_client.post = AsyncMock() + + await scheduler._run_once(job) + + # Should not have sent a message to the room (error in executor) + # But should have reported back + # The report happens via httpx post