From 006ed48cbed6656d783ee9b416479bcb07573ae8 Mon Sep 17 00:00:00 2001 From: Christian Gick Date: Thu, 19 Mar 2026 10:41:07 +0200 Subject: [PATCH] fix: prevent re-triggering pending jobs while already running Track running job IDs to avoid creating duplicate Skyvern tasks when the pending check runs faster than the task completes. Co-Authored-By: Claude Opus 4.6 (1M context) --- cron/scheduler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index fcb6aae..40d267d 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -41,6 +41,7 @@ class CronScheduler: ) self._pipelines: dict[str, dict] = {} # id -> pipeline data self._pipeline_tasks: dict[str, asyncio.Task] = {} # id -> scheduler task + self._running_jobs: set[str] = set() # job IDs currently executing async def start(self): """Start the scheduler background loops.""" @@ -85,8 +86,9 @@ class CronScheduler: data = resp.json() for job in data.get("jobs", []): - if job.get("lastStatus") == "pending": + if job.get("lastStatus") == "pending" and job["id"] not in self._running_jobs: logger.info("Pending trigger: %s", job["name"]) + self._running_jobs.add(job["id"]) asyncio.create_task(self._run_once(job)) async def _pipeline_sync_loop(self): @@ -235,6 +237,8 @@ class CronScheduler: "status": "error", "error": str(exc), }) + finally: + self._running_jobs.discard(job_id) async def _report_result(self, job_id: str, result: dict): """Report job execution result back to the portal."""