From 0ae59c8ebed1a011080c3427ac0a3e558cf6bf0f Mon Sep 17 00:00:00 2001 From: Christian Gick Date: Tue, 24 Mar 2026 08:29:24 +0200 Subject: [PATCH] =?UTF-8?q?feat(CF-2411):=20Pipeline=20hardening=20?= =?UTF-8?q?=E2=80=94=20Sentry,=20retry,=20concurrent=20limits,=20audit=20l?= =?UTF-8?q?og?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Sentry transactions wrapping pipeline execution with tags - Retry with exponential backoff for transient failures (connect, timeout, 5xx) - Concurrent execution limit (3/user) enforced in scheduler - Audit log events fired at each pipeline lifecycle point - Resume support: skip already-completed steps on restart Co-Authored-By: Claude Opus 4.6 (1M context) --- cron/scheduler.py | 21 +++ pipelines/engine.py | 335 +++++++++++++++++++++++++++++--------------- pipelines/state.py | 40 ++++++ 3 files changed, 286 insertions(+), 110 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 40d267d..5855127 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -13,6 +13,7 @@ logger = logging.getLogger(__name__) SYNC_INTERVAL = 300 # 5 minutes — full job reconciliation PENDING_CHECK_INTERVAL = 15 # 15 seconds — fast check for manual triggers +MAX_CONCURRENT_PER_USER = 3 # CF-2411: prevent runaway pipelines class CronScheduler: @@ -145,6 +146,16 @@ class CronScheduler: pipelines = await self._pipeline_state.fetch_active_pipelines() for pipeline in pipelines: if pipeline.get("lastStatus") == "pending": + # CF-2411: concurrent limit check + user_id = pipeline.get("userId", "") + if user_id: + active = await self._pipeline_state.count_active_executions(user_id) + if active >= MAX_CONCURRENT_PER_USER: + logger.warning( + "Pipeline %s skipped: user %s has %d active executions (limit %d)", + pipeline["name"], user_id, active, MAX_CONCURRENT_PER_USER, + ) + continue logger.info("Pending pipeline trigger: %s", pipeline["name"]) asyncio.create_task(self.pipeline_engine.run(pipeline)) @@ -155,6 +166,16 @@ class CronScheduler: sleep_secs = self._seconds_until_next_run(pipeline) if sleep_secs > 0: await asyncio.sleep(sleep_secs) + # CF-2411: concurrent limit check + user_id = pipeline.get("userId", "") + if user_id: + active = await self._pipeline_state.count_active_executions(user_id) + if active >= MAX_CONCURRENT_PER_USER: + logger.warning( + "Pipeline %s cron skipped: user %s at limit (%d/%d)", + pipeline["name"], user_id, active, MAX_CONCURRENT_PER_USER, + ) + continue await self.pipeline_engine.run(pipeline) except asyncio.CancelledError: pass diff --git a/pipelines/engine.py b/pipelines/engine.py index 6bfddc4..7b5d382 100644 --- a/pipelines/engine.py +++ b/pipelines/engine.py @@ -6,11 +6,26 @@ import re import time from datetime import datetime, timezone, timedelta +import httpx +import sentry_sdk + from .state import PipelineStateManager from .steps import execute_step logger = logging.getLogger(__name__) +# Transient errors eligible for retry +TRANSIENT_EXCEPTIONS = ( + httpx.ConnectError, + httpx.ConnectTimeout, + httpx.ReadTimeout, + ConnectionError, + OSError, +) + +# Sentinel for failed step (distinct from None output) +_STEP_FAILED = object() + class PipelineEngine: """Executes pipeline steps sequentially, managing state and output chaining.""" @@ -74,80 +89,96 @@ class PipelineEngine: pipeline_name = pipeline["name"] target_room = pipeline["targetRoom"] steps = pipeline.get("steps", []) + user_id = pipeline.get("userId", "") + max_retries = pipeline.get("maxRetries", 0) if not steps: logger.warning("Pipeline %s has no steps", pipeline_name) return - # Create execution record - execution = await self.state.create_execution(pipeline_id, trigger_data) - execution_id = execution["id"] + with sentry_sdk.start_transaction(op="pipeline.execute", name=pipeline_name) as txn: + txn.set_tag("pipeline_id", pipeline_id) + txn.set_tag("user_id", user_id) + txn.set_tag("trigger_type", pipeline.get("triggerType", "unknown")) + txn.set_tag("step_count", len(steps)) - context: dict[str, dict] = {} - if trigger_data: - context["trigger"] = trigger_data + # Create execution record + execution = await self.state.create_execution(pipeline_id, trigger_data) + execution_id = execution["id"] + txn.set_tag("execution_id", execution_id) - step_results: list[dict] = [] + # Audit: execution started + asyncio.create_task(self.state.log_event( + execution_id, "execution_started", message=f"Trigger: {pipeline.get('triggerType', 'manual')}" + )) - try: - for i, step in enumerate(steps): - step_name = step.get("name", f"step_{i}") - step_type = step.get("type", "") + context: dict[str, dict] = {} + if trigger_data: + context["trigger"] = trigger_data - # Evaluate condition - condition = step.get("if") - if condition and not self.evaluate_condition(condition, context): - logger.info("Pipeline %s: skipping step %s (condition not met)", pipeline_name, step_name) - result = {"name": step_name, "output": "skipped", "status": "skipped", "timestamp": time.time()} - step_results.append(result) - context[step_name] = {"output": "skipped", "status": "skipped"} - continue + step_results: list[dict] = [] - # Update execution state - await self.state.update_execution( - execution_id, - currentStep=i, - stepResults=step_results, - state="running", - ) + try: + for i, step in enumerate(steps): + step_name = step.get("name", f"step_{i}") + step_type = step.get("type", "") - logger.info("Pipeline %s: executing step %s (%s)", pipeline_name, step_name, step_type) - - # Render templates in step config - rendered_step = {} - for key, value in step.items(): - if isinstance(value, str): - rendered_step[key] = self.render_template(value, context) - elif isinstance(value, dict): - rendered_step[key] = { - k: self.render_template(v, context) if isinstance(v, str) else v - for k, v in value.items() + # Resume support: skip already-completed steps + if i < len(step_results) and step_results[i].get("status") in ("success", "skipped"): + context[step_name] = { + "output": step_results[i].get("output", ""), + "status": step_results[i]["status"], } - else: - rendered_step[key] = value + continue - # Execute step - try: - timeout_s = step.get("timeout_s", 60) + # Evaluate condition + condition = step.get("if") + if condition and not self.evaluate_condition(condition, context): + logger.info("Pipeline %s: skipping step %s (condition not met)", pipeline_name, step_name) + result = {"name": step_name, "output": "skipped", "status": "skipped", "timestamp": time.time()} + step_results.append(result) + context[step_name] = {"output": "skipped", "status": "skipped"} + continue - if step_type == "approval": - output = await self._execute_approval_step( - rendered_step, target_room, execution_id, timeout_s - ) - else: - output = await asyncio.wait_for( - execute_step( - step_type=step_type, - step_config=rendered_step, - context=context, - send_text=self.send_text, - target_room=target_room, - llm=self.llm, - default_model=self.default_model, - escalation_model=self.escalation_model, - ), - timeout=timeout_s, - ) + # Update execution state + await self.state.update_execution( + execution_id, + currentStep=i, + stepResults=step_results, + state="running", + ) + + logger.info("Pipeline %s: executing step %s (%s)", pipeline_name, step_name, step_type) + + # Render templates in step config + rendered_step = {} + for key, value in step.items(): + if isinstance(value, str): + rendered_step[key] = self.render_template(value, context) + elif isinstance(value, dict): + rendered_step[key] = { + k: self.render_template(v, context) if isinstance(v, str) else v + for k, v in value.items() + } + else: + rendered_step[key] = value + + # Execute step with retry logic + output = await self._execute_step_with_retry( + step_type=step_type, + step_name=step_name, + rendered_step=rendered_step, + context=context, + target_room=target_room, + execution_id=execution_id, + timeout_s=step.get("timeout_s", 60), + max_retries=max_retries, + step_results=step_results, + pipeline_name=pipeline_name, + ) + + if output is _STEP_FAILED: + return # step handler already updated state result = { "name": step_name, @@ -162,60 +193,133 @@ class PipelineEngine: if step_type == "approval": context[step_name]["response"] = output - except asyncio.TimeoutError: - error_msg = f"Step {step_name} timed out after {step.get('timeout_s', 60)}s" - logger.error("Pipeline %s: %s", pipeline_name, error_msg) - step_results.append({ - "name": step_name, - "output": None, - "status": "timeout", - "error": error_msg, - "timestamp": time.time(), - }) - await self.state.update_execution( - execution_id, - state="failed", - stepResults=step_results, - error=error_msg, + # Audit: step completed + asyncio.create_task(self.state.log_event( + execution_id, "step_completed", step_name=step_name, status="success" + )) + + # All steps completed + await self.state.update_execution( + execution_id, + state="complete", + stepResults=step_results, + ) + logger.info("Pipeline %s completed successfully", pipeline_name) + + # Audit: execution completed + asyncio.create_task(self.state.log_event( + execution_id, "execution_completed", + message=f"{len(steps)} steps completed", + )) + + except Exception as exc: + logger.error("Pipeline %s failed unexpectedly: %s", pipeline_name, exc, exc_info=True) + sentry_sdk.capture_exception(exc) + await self.state.update_execution( + execution_id, + state="failed", + stepResults=step_results, + error=str(exc), + ) + asyncio.create_task(self.state.log_event( + execution_id, "execution_failed", message=str(exc) + )) + + async def _execute_step_with_retry( + self, *, step_type, step_name, rendered_step, context, target_room, + execution_id, timeout_s, max_retries, step_results, pipeline_name, + ): + """Execute a step with retry on transient failures. Returns output or _STEP_FAILED.""" + last_exc = None + + for attempt in range(max_retries + 1): + try: + if step_type == "approval": + output = await self._execute_approval_step( + rendered_step, target_room, execution_id, timeout_s ) - await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") - return - - except Exception as exc: - error_msg = f"Step {step_name} failed: {exc}" - logger.error("Pipeline %s: %s", pipeline_name, error_msg, exc_info=True) - step_results.append({ - "name": step_name, - "output": None, - "status": "error", - "error": str(exc), - "timestamp": time.time(), - }) - await self.state.update_execution( - execution_id, - state="failed", - stepResults=step_results, - error=error_msg, + else: + output = await asyncio.wait_for( + execute_step( + step_type=step_type, + step_config=rendered_step, + context=context, + send_text=self.send_text, + target_room=target_room, + llm=self.llm, + default_model=self.default_model, + escalation_model=self.escalation_model, + ), + timeout=timeout_s, ) - await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") - return + return output - # All steps completed - await self.state.update_execution( - execution_id, - state="complete", - stepResults=step_results, - ) - logger.info("Pipeline %s completed successfully", pipeline_name) + except TRANSIENT_EXCEPTIONS as exc: + last_exc = exc + if attempt < max_retries: + backoff = 2 ** attempt # 1s, 2s, 4s + logger.warning( + "Pipeline %s: step %s transient failure (attempt %d/%d), retrying in %ds: %s", + pipeline_name, step_name, attempt + 1, max_retries + 1, backoff, exc, + ) + asyncio.create_task(self.state.log_event( + execution_id, "retry_attempted", step_name=step_name, + message=f"Attempt {attempt + 1}/{max_retries + 1}: {exc}", + )) + await asyncio.sleep(backoff) + continue + # Exhausted retries — fall through to failure - except Exception as exc: - logger.error("Pipeline %s failed unexpectedly: %s", pipeline_name, exc, exc_info=True) - await self.state.update_execution( - execution_id, - state="failed", - stepResults=step_results, - error=str(exc), - ) + except asyncio.TimeoutError: + error_msg = f"Step {step_name} timed out after {timeout_s}s" + logger.error("Pipeline %s: %s", pipeline_name, error_msg) + sentry_sdk.capture_message(error_msg, level="error") + step_results.append({ + "name": step_name, "output": None, "status": "timeout", + "error": error_msg, "timestamp": time.time(), + }) + await self.state.update_execution( + execution_id, state="failed", stepResults=step_results, error=error_msg, + ) + await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") + asyncio.create_task(self.state.log_event( + execution_id, "step_failed", step_name=step_name, status="timeout", message=error_msg, + )) + return _STEP_FAILED + + except Exception as exc: + error_msg = f"Step {step_name} failed: {exc}" + logger.error("Pipeline %s: %s", pipeline_name, error_msg, exc_info=True) + sentry_sdk.capture_exception(exc) + step_results.append({ + "name": step_name, "output": None, "status": "error", + "error": str(exc), "timestamp": time.time(), + }) + await self.state.update_execution( + execution_id, state="failed", stepResults=step_results, error=error_msg, + ) + await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") + asyncio.create_task(self.state.log_event( + execution_id, "step_failed", step_name=step_name, status="error", message=str(exc), + )) + return _STEP_FAILED + + # Retries exhausted on transient failure + error_msg = f"Step {step_name} failed after {max_retries + 1} attempts: {last_exc}" + logger.error("Pipeline %s: %s", pipeline_name, error_msg) + sentry_sdk.capture_exception(last_exc) + step_results.append({ + "name": step_name, "output": None, "status": "error", + "error": error_msg, "timestamp": time.time(), "retries": max_retries, + }) + await self.state.update_execution( + execution_id, state="failed", stepResults=step_results, error=error_msg, + ) + await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") + asyncio.create_task(self.state.log_event( + execution_id, "step_failed", step_name=step_name, status="error", message=error_msg, + )) + return _STEP_FAILED @staticmethod def _md_to_html(text: str) -> str: @@ -275,10 +379,21 @@ class PipelineEngine: try: result = await asyncio.wait_for(future, timeout=timeout_s) + asyncio.create_task(self.state.log_event( + execution_id, "approval_resolved", message=f"Response: {result}", + )) return result # "approve" or "decline" except asyncio.TimeoutError: + sentry_sdk.capture_message( + f"Approval timeout: execution={execution_id}", + level="warning", + ) await self.send_text(target_room, "Approval timed out. Pipeline aborted.") await self.state.update_execution(execution_id, state="aborted", error="Approval timed out") + asyncio.create_task(self.state.log_event( + execution_id, "step_failed", step_name="approval", status="timeout", + message="Approval timed out", + )) raise finally: self._approval_futures.pop(execution_id, None) diff --git a/pipelines/state.py b/pipelines/state.py index 4f9ccb6..5f20672 100644 --- a/pipelines/state.py +++ b/pipelines/state.py @@ -60,3 +60,43 @@ class PipelineStateManager: except Exception: logger.debug("Failed to fetch pending approvals", exc_info=True) return [] + + async def count_active_executions(self, user_id: str) -> int: + """Count running/waiting_approval executions for a user.""" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get( + f"{self.portal_url}/api/pipelines/executions/active", + headers={"x-api-key": self.api_key}, + params={"userId": user_id}, + ) + resp.raise_for_status() + data = resp.json() + return data.get("count", 0) + except Exception: + logger.warning("Failed to count active executions for user %s", user_id, exc_info=True) + return 0 + + async def log_event( + self, execution_id: str, action: str, *, + step_name: str | None = None, + status: str | None = None, + message: str | None = None, + ) -> None: + """Log an audit event for a pipeline execution (fire-and-forget).""" + try: + payload = {"action": action} + if step_name: + payload["stepName"] = step_name + if status: + payload["status"] = status + if message: + payload["message"] = message + async with httpx.AsyncClient(timeout=10.0) as client: + await client.post( + f"{self.portal_url}/api/pipelines/executions/{execution_id}/audit-log", + headers={"x-api-key": self.api_key}, + json=payload, + ) + except Exception: + logger.debug("Failed to log audit event %s for %s", action, execution_id)