fix: prevent re-triggering pending jobs while already running
Track running job IDs to avoid creating duplicate Skyvern tasks when the pending check runs faster than the task completes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -41,6 +41,7 @@ class CronScheduler:
|
||||
)
|
||||
self._pipelines: dict[str, dict] = {} # id -> pipeline data
|
||||
self._pipeline_tasks: dict[str, asyncio.Task] = {} # id -> scheduler task
|
||||
self._running_jobs: set[str] = set() # job IDs currently executing
|
||||
|
||||
async def start(self):
|
||||
"""Start the scheduler background loops."""
|
||||
@@ -85,8 +86,9 @@ class CronScheduler:
|
||||
data = resp.json()
|
||||
|
||||
for job in data.get("jobs", []):
|
||||
if job.get("lastStatus") == "pending":
|
||||
if job.get("lastStatus") == "pending" and job["id"] not in self._running_jobs:
|
||||
logger.info("Pending trigger: %s", job["name"])
|
||||
self._running_jobs.add(job["id"])
|
||||
asyncio.create_task(self._run_once(job))
|
||||
|
||||
async def _pipeline_sync_loop(self):
|
||||
@@ -235,6 +237,8 @@ class CronScheduler:
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
})
|
||||
finally:
|
||||
self._running_jobs.discard(job_id)
|
||||
|
||||
async def _report_result(self, job_id: str, result: dict):
|
||||
"""Report job execution result back to the portal."""
|
||||
|
||||
Reference in New Issue
Block a user