diff --git a/cron/scheduler.py b/cron/scheduler.py index cb4d84c..ee28fec 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -10,7 +10,8 @@ from .executor import execute_job logger = logging.getLogger(__name__) -SYNC_INTERVAL = 300 # 5 minutes +SYNC_INTERVAL = 300 # 5 minutes — full job reconciliation +PENDING_CHECK_INTERVAL = 15 # 15 seconds — fast check for manual triggers class CronScheduler: @@ -26,10 +27,18 @@ class CronScheduler: self._running = False async def start(self): - """Start the scheduler background loop.""" + """Start the scheduler background loops.""" self._running = True logger.info("Cron scheduler starting") await asyncio.sleep(15) # wait for bot to stabilize + # Run full sync + fast pending check in parallel + await asyncio.gather( + self._full_sync_loop(), + self._pending_check_loop(), + ) + + async def _full_sync_loop(self): + """Full job reconciliation every 5 minutes.""" while self._running: try: await self._sync_jobs() @@ -37,6 +46,31 @@ class CronScheduler: logger.warning("Cron job sync failed", exc_info=True) await asyncio.sleep(SYNC_INTERVAL) + async def _pending_check_loop(self): + """Fast poll for manual triggers every 15 seconds.""" + while self._running: + try: + await self._check_pending() + except Exception: + logger.debug("Pending check failed", exc_info=True) + await asyncio.sleep(PENDING_CHECK_INTERVAL) + + async def _check_pending(self): + """Quick check for jobs with lastStatus='pending' and run them.""" + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get( + f"{self.portal_url}/api/cron/jobs/active", + headers={"x-api-key": self.api_key}, + ) + if resp.status_code != 200: + return + data = resp.json() + + for job in data.get("jobs", []): + if job.get("lastStatus") == "pending": + logger.info("Pending trigger: %s", job["name"]) + asyncio.create_task(self._run_once(job)) + async def stop(self): self._running = False for task in self._tasks.values(): @@ -80,12 +114,6 @@ class CronScheduler: logger.info("Scheduled cron job: %s (%s @ %s %s)", job["name"], job["schedule"], job.get("scheduleAt", ""), job["timezone"]) - # Check for manual triggers (lastStatus == "pending") - for job_id, job in remote_jobs.items(): - if job.get("lastStatus") == "pending": - logger.info("Manual trigger detected for %s", job["name"]) - asyncio.create_task(self._run_once(job)) - async def _job_loop(self, job: dict): """Run a job on its schedule forever.""" try: