feat: add pipeline engine with approval flow and file triggers
Sequential step executor (script, claude_prompt, approval, api_call, template, skyvern placeholder), reaction-based approvals, file upload trigger matching, portal API state sync. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,7 @@ from datetime import datetime, timezone
|
||||
import httpx
|
||||
|
||||
from .executor import execute_job
|
||||
from pipelines import PipelineEngine, PipelineStateManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -17,7 +18,9 @@ PENDING_CHECK_INTERVAL = 15 # 15 seconds — fast check for manual triggers
|
||||
class CronScheduler:
|
||||
"""Fetches enabled cron jobs from the matrixhost portal and runs them on schedule."""
|
||||
|
||||
def __init__(self, portal_url: str, api_key: str, matrix_client, send_text_fn):
|
||||
def __init__(self, portal_url: str, api_key: str, matrix_client, send_text_fn,
|
||||
llm_client=None, default_model: str = "claude-haiku",
|
||||
escalation_model: str = "claude-sonnet"):
|
||||
self.portal_url = portal_url.rstrip("/")
|
||||
self.api_key = api_key
|
||||
self.matrix_client = matrix_client
|
||||
@@ -26,6 +29,19 @@ class CronScheduler:
|
||||
self._tasks: dict[str, asyncio.Task] = {} # id -> scheduler task
|
||||
self._running = False
|
||||
|
||||
# Pipeline engine
|
||||
self._pipeline_state = PipelineStateManager(portal_url, api_key)
|
||||
self.pipeline_engine = PipelineEngine(
|
||||
state=self._pipeline_state,
|
||||
send_text=send_text_fn,
|
||||
matrix_client=matrix_client,
|
||||
llm_client=llm_client,
|
||||
default_model=default_model,
|
||||
escalation_model=escalation_model,
|
||||
)
|
||||
self._pipelines: dict[str, dict] = {} # id -> pipeline data
|
||||
self._pipeline_tasks: dict[str, asyncio.Task] = {} # id -> scheduler task
|
||||
|
||||
async def start(self):
|
||||
"""Start the scheduler background loops."""
|
||||
self._running = True
|
||||
@@ -35,6 +51,8 @@ class CronScheduler:
|
||||
await asyncio.gather(
|
||||
self._full_sync_loop(),
|
||||
self._pending_check_loop(),
|
||||
self._pipeline_sync_loop(),
|
||||
self._pipeline_pending_check_loop(),
|
||||
)
|
||||
|
||||
async def _full_sync_loop(self):
|
||||
@@ -71,11 +89,86 @@ class CronScheduler:
|
||||
logger.info("Pending trigger: %s", job["name"])
|
||||
asyncio.create_task(self._run_once(job))
|
||||
|
||||
async def _pipeline_sync_loop(self):
|
||||
"""Full pipeline reconciliation every 5 minutes."""
|
||||
while self._running:
|
||||
try:
|
||||
await self._sync_pipelines()
|
||||
except Exception:
|
||||
logger.warning("Pipeline sync failed", exc_info=True)
|
||||
await asyncio.sleep(SYNC_INTERVAL)
|
||||
|
||||
async def _pipeline_pending_check_loop(self):
|
||||
"""Fast poll for manually triggered pipelines every 15 seconds."""
|
||||
while self._running:
|
||||
try:
|
||||
await self._check_pending_pipelines()
|
||||
except Exception:
|
||||
logger.debug("Pipeline pending check failed", exc_info=True)
|
||||
await asyncio.sleep(PENDING_CHECK_INTERVAL)
|
||||
|
||||
async def _sync_pipelines(self):
|
||||
"""Fetch active pipelines from portal and reconcile."""
|
||||
pipelines = await self._pipeline_state.fetch_active_pipelines()
|
||||
remote = {p["id"]: p for p in pipelines}
|
||||
|
||||
# Remove pipelines no longer active
|
||||
for pid in list(self._pipeline_tasks):
|
||||
if pid not in remote:
|
||||
logger.info("Removing pipeline %s (no longer active)", pid)
|
||||
self._pipeline_tasks[pid].cancel()
|
||||
del self._pipeline_tasks[pid]
|
||||
self._pipelines.pop(pid, None)
|
||||
|
||||
# Add/update cron-triggered pipelines
|
||||
for pid, pipeline in remote.items():
|
||||
existing = self._pipelines.get(pid)
|
||||
if existing and existing.get("updatedAt") == pipeline.get("updatedAt"):
|
||||
continue
|
||||
|
||||
if pid in self._pipeline_tasks:
|
||||
self._pipeline_tasks[pid].cancel()
|
||||
|
||||
self._pipelines[pid] = pipeline
|
||||
|
||||
if pipeline.get("triggerType") == "cron":
|
||||
self._pipeline_tasks[pid] = asyncio.create_task(
|
||||
self._pipeline_cron_loop(pipeline), name=f"pipeline-{pid}"
|
||||
)
|
||||
logger.info("Scheduled pipeline: %s (%s @ %s)",
|
||||
pipeline["name"], pipeline.get("schedule", ""), pipeline.get("scheduleAt", ""))
|
||||
|
||||
async def _check_pending_pipelines(self):
|
||||
"""Check for pipelines with lastStatus='pending' and run them."""
|
||||
pipelines = await self._pipeline_state.fetch_active_pipelines()
|
||||
for pipeline in pipelines:
|
||||
if pipeline.get("lastStatus") == "pending":
|
||||
logger.info("Pending pipeline trigger: %s", pipeline["name"])
|
||||
asyncio.create_task(self.pipeline_engine.run(pipeline))
|
||||
|
||||
async def _pipeline_cron_loop(self, pipeline: dict):
|
||||
"""Run a pipeline on its cron schedule."""
|
||||
try:
|
||||
while True:
|
||||
sleep_secs = self._seconds_until_next_run(pipeline)
|
||||
if sleep_secs > 0:
|
||||
await asyncio.sleep(sleep_secs)
|
||||
await self.pipeline_engine.run(pipeline)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
def get_file_upload_pipelines(self) -> list[dict]:
|
||||
"""Return all active file_upload-triggered pipelines."""
|
||||
return [p for p in self._pipelines.values() if p.get("triggerType") == "file_upload"]
|
||||
|
||||
async def stop(self):
|
||||
self._running = False
|
||||
for task in self._tasks.values():
|
||||
task.cancel()
|
||||
self._tasks.clear()
|
||||
for task in self._pipeline_tasks.values():
|
||||
task.cancel()
|
||||
self._pipeline_tasks.clear()
|
||||
|
||||
async def _sync_jobs(self):
|
||||
"""Fetch active jobs from portal and reconcile with running tasks."""
|
||||
|
||||
Reference in New Issue
Block a user