feat(CF-2411): Pipeline hardening — Sentry, retry, concurrent limits, audit log
- Sentry transactions wrapping pipeline execution with tags - Retry with exponential backoff for transient failures (connect, timeout, 5xx) - Concurrent execution limit (3/user) enforced in scheduler - Audit log events fired at each pipeline lifecycle point - Resume support: skip already-completed steps on restart Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -13,6 +13,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
SYNC_INTERVAL = 300 # 5 minutes — full job reconciliation
|
SYNC_INTERVAL = 300 # 5 minutes — full job reconciliation
|
||||||
PENDING_CHECK_INTERVAL = 15 # 15 seconds — fast check for manual triggers
|
PENDING_CHECK_INTERVAL = 15 # 15 seconds — fast check for manual triggers
|
||||||
|
MAX_CONCURRENT_PER_USER = 3 # CF-2411: prevent runaway pipelines
|
||||||
|
|
||||||
|
|
||||||
class CronScheduler:
|
class CronScheduler:
|
||||||
@@ -145,6 +146,16 @@ class CronScheduler:
|
|||||||
pipelines = await self._pipeline_state.fetch_active_pipelines()
|
pipelines = await self._pipeline_state.fetch_active_pipelines()
|
||||||
for pipeline in pipelines:
|
for pipeline in pipelines:
|
||||||
if pipeline.get("lastStatus") == "pending":
|
if pipeline.get("lastStatus") == "pending":
|
||||||
|
# CF-2411: concurrent limit check
|
||||||
|
user_id = pipeline.get("userId", "")
|
||||||
|
if user_id:
|
||||||
|
active = await self._pipeline_state.count_active_executions(user_id)
|
||||||
|
if active >= MAX_CONCURRENT_PER_USER:
|
||||||
|
logger.warning(
|
||||||
|
"Pipeline %s skipped: user %s has %d active executions (limit %d)",
|
||||||
|
pipeline["name"], user_id, active, MAX_CONCURRENT_PER_USER,
|
||||||
|
)
|
||||||
|
continue
|
||||||
logger.info("Pending pipeline trigger: %s", pipeline["name"])
|
logger.info("Pending pipeline trigger: %s", pipeline["name"])
|
||||||
asyncio.create_task(self.pipeline_engine.run(pipeline))
|
asyncio.create_task(self.pipeline_engine.run(pipeline))
|
||||||
|
|
||||||
@@ -155,6 +166,16 @@ class CronScheduler:
|
|||||||
sleep_secs = self._seconds_until_next_run(pipeline)
|
sleep_secs = self._seconds_until_next_run(pipeline)
|
||||||
if sleep_secs > 0:
|
if sleep_secs > 0:
|
||||||
await asyncio.sleep(sleep_secs)
|
await asyncio.sleep(sleep_secs)
|
||||||
|
# CF-2411: concurrent limit check
|
||||||
|
user_id = pipeline.get("userId", "")
|
||||||
|
if user_id:
|
||||||
|
active = await self._pipeline_state.count_active_executions(user_id)
|
||||||
|
if active >= MAX_CONCURRENT_PER_USER:
|
||||||
|
logger.warning(
|
||||||
|
"Pipeline %s cron skipped: user %s at limit (%d/%d)",
|
||||||
|
pipeline["name"], user_id, active, MAX_CONCURRENT_PER_USER,
|
||||||
|
)
|
||||||
|
continue
|
||||||
await self.pipeline_engine.run(pipeline)
|
await self.pipeline_engine.run(pipeline)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -6,11 +6,26 @@ import re
|
|||||||
import time
|
import time
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import sentry_sdk
|
||||||
|
|
||||||
from .state import PipelineStateManager
|
from .state import PipelineStateManager
|
||||||
from .steps import execute_step
|
from .steps import execute_step
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Transient errors eligible for retry
|
||||||
|
TRANSIENT_EXCEPTIONS = (
|
||||||
|
httpx.ConnectError,
|
||||||
|
httpx.ConnectTimeout,
|
||||||
|
httpx.ReadTimeout,
|
||||||
|
ConnectionError,
|
||||||
|
OSError,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Sentinel for failed step (distinct from None output)
|
||||||
|
_STEP_FAILED = object()
|
||||||
|
|
||||||
|
|
||||||
class PipelineEngine:
|
class PipelineEngine:
|
||||||
"""Executes pipeline steps sequentially, managing state and output chaining."""
|
"""Executes pipeline steps sequentially, managing state and output chaining."""
|
||||||
@@ -74,14 +89,28 @@ class PipelineEngine:
|
|||||||
pipeline_name = pipeline["name"]
|
pipeline_name = pipeline["name"]
|
||||||
target_room = pipeline["targetRoom"]
|
target_room = pipeline["targetRoom"]
|
||||||
steps = pipeline.get("steps", [])
|
steps = pipeline.get("steps", [])
|
||||||
|
user_id = pipeline.get("userId", "")
|
||||||
|
max_retries = pipeline.get("maxRetries", 0)
|
||||||
|
|
||||||
if not steps:
|
if not steps:
|
||||||
logger.warning("Pipeline %s has no steps", pipeline_name)
|
logger.warning("Pipeline %s has no steps", pipeline_name)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
with sentry_sdk.start_transaction(op="pipeline.execute", name=pipeline_name) as txn:
|
||||||
|
txn.set_tag("pipeline_id", pipeline_id)
|
||||||
|
txn.set_tag("user_id", user_id)
|
||||||
|
txn.set_tag("trigger_type", pipeline.get("triggerType", "unknown"))
|
||||||
|
txn.set_tag("step_count", len(steps))
|
||||||
|
|
||||||
# Create execution record
|
# Create execution record
|
||||||
execution = await self.state.create_execution(pipeline_id, trigger_data)
|
execution = await self.state.create_execution(pipeline_id, trigger_data)
|
||||||
execution_id = execution["id"]
|
execution_id = execution["id"]
|
||||||
|
txn.set_tag("execution_id", execution_id)
|
||||||
|
|
||||||
|
# Audit: execution started
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "execution_started", message=f"Trigger: {pipeline.get('triggerType', 'manual')}"
|
||||||
|
))
|
||||||
|
|
||||||
context: dict[str, dict] = {}
|
context: dict[str, dict] = {}
|
||||||
if trigger_data:
|
if trigger_data:
|
||||||
@@ -94,6 +123,14 @@ class PipelineEngine:
|
|||||||
step_name = step.get("name", f"step_{i}")
|
step_name = step.get("name", f"step_{i}")
|
||||||
step_type = step.get("type", "")
|
step_type = step.get("type", "")
|
||||||
|
|
||||||
|
# Resume support: skip already-completed steps
|
||||||
|
if i < len(step_results) and step_results[i].get("status") in ("success", "skipped"):
|
||||||
|
context[step_name] = {
|
||||||
|
"output": step_results[i].get("output", ""),
|
||||||
|
"status": step_results[i]["status"],
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
|
||||||
# Evaluate condition
|
# Evaluate condition
|
||||||
condition = step.get("if")
|
condition = step.get("if")
|
||||||
if condition and not self.evaluate_condition(condition, context):
|
if condition and not self.evaluate_condition(condition, context):
|
||||||
@@ -126,10 +163,77 @@ class PipelineEngine:
|
|||||||
else:
|
else:
|
||||||
rendered_step[key] = value
|
rendered_step[key] = value
|
||||||
|
|
||||||
# Execute step
|
# Execute step with retry logic
|
||||||
try:
|
output = await self._execute_step_with_retry(
|
||||||
timeout_s = step.get("timeout_s", 60)
|
step_type=step_type,
|
||||||
|
step_name=step_name,
|
||||||
|
rendered_step=rendered_step,
|
||||||
|
context=context,
|
||||||
|
target_room=target_room,
|
||||||
|
execution_id=execution_id,
|
||||||
|
timeout_s=step.get("timeout_s", 60),
|
||||||
|
max_retries=max_retries,
|
||||||
|
step_results=step_results,
|
||||||
|
pipeline_name=pipeline_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
if output is _STEP_FAILED:
|
||||||
|
return # step handler already updated state
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"name": step_name,
|
||||||
|
"output": output,
|
||||||
|
"status": "success",
|
||||||
|
"timestamp": time.time(),
|
||||||
|
}
|
||||||
|
step_results.append(result)
|
||||||
|
context[step_name] = {"output": output, "status": "success"}
|
||||||
|
|
||||||
|
# For approval steps, also store the response field
|
||||||
|
if step_type == "approval":
|
||||||
|
context[step_name]["response"] = output
|
||||||
|
|
||||||
|
# Audit: step completed
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "step_completed", step_name=step_name, status="success"
|
||||||
|
))
|
||||||
|
|
||||||
|
# All steps completed
|
||||||
|
await self.state.update_execution(
|
||||||
|
execution_id,
|
||||||
|
state="complete",
|
||||||
|
stepResults=step_results,
|
||||||
|
)
|
||||||
|
logger.info("Pipeline %s completed successfully", pipeline_name)
|
||||||
|
|
||||||
|
# Audit: execution completed
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "execution_completed",
|
||||||
|
message=f"{len(steps)} steps completed",
|
||||||
|
))
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Pipeline %s failed unexpectedly: %s", pipeline_name, exc, exc_info=True)
|
||||||
|
sentry_sdk.capture_exception(exc)
|
||||||
|
await self.state.update_execution(
|
||||||
|
execution_id,
|
||||||
|
state="failed",
|
||||||
|
stepResults=step_results,
|
||||||
|
error=str(exc),
|
||||||
|
)
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "execution_failed", message=str(exc)
|
||||||
|
))
|
||||||
|
|
||||||
|
async def _execute_step_with_retry(
|
||||||
|
self, *, step_type, step_name, rendered_step, context, target_room,
|
||||||
|
execution_id, timeout_s, max_retries, step_results, pipeline_name,
|
||||||
|
):
|
||||||
|
"""Execute a step with retry on transient failures. Returns output or _STEP_FAILED."""
|
||||||
|
last_exc = None
|
||||||
|
|
||||||
|
for attempt in range(max_retries + 1):
|
||||||
|
try:
|
||||||
if step_type == "approval":
|
if step_type == "approval":
|
||||||
output = await self._execute_approval_step(
|
output = await self._execute_approval_step(
|
||||||
rendered_step, target_room, execution_id, timeout_s
|
rendered_step, target_room, execution_id, timeout_s
|
||||||
@@ -148,74 +252,74 @@ class PipelineEngine:
|
|||||||
),
|
),
|
||||||
timeout=timeout_s,
|
timeout=timeout_s,
|
||||||
)
|
)
|
||||||
|
return output
|
||||||
|
|
||||||
result = {
|
except TRANSIENT_EXCEPTIONS as exc:
|
||||||
"name": step_name,
|
last_exc = exc
|
||||||
"output": output,
|
if attempt < max_retries:
|
||||||
"status": "success",
|
backoff = 2 ** attempt # 1s, 2s, 4s
|
||||||
"timestamp": time.time(),
|
logger.warning(
|
||||||
}
|
"Pipeline %s: step %s transient failure (attempt %d/%d), retrying in %ds: %s",
|
||||||
step_results.append(result)
|
pipeline_name, step_name, attempt + 1, max_retries + 1, backoff, exc,
|
||||||
context[step_name] = {"output": output, "status": "success"}
|
)
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
# For approval steps, also store the response field
|
execution_id, "retry_attempted", step_name=step_name,
|
||||||
if step_type == "approval":
|
message=f"Attempt {attempt + 1}/{max_retries + 1}: {exc}",
|
||||||
context[step_name]["response"] = output
|
))
|
||||||
|
await asyncio.sleep(backoff)
|
||||||
|
continue
|
||||||
|
# Exhausted retries — fall through to failure
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
error_msg = f"Step {step_name} timed out after {step.get('timeout_s', 60)}s"
|
error_msg = f"Step {step_name} timed out after {timeout_s}s"
|
||||||
logger.error("Pipeline %s: %s", pipeline_name, error_msg)
|
logger.error("Pipeline %s: %s", pipeline_name, error_msg)
|
||||||
|
sentry_sdk.capture_message(error_msg, level="error")
|
||||||
step_results.append({
|
step_results.append({
|
||||||
"name": step_name,
|
"name": step_name, "output": None, "status": "timeout",
|
||||||
"output": None,
|
"error": error_msg, "timestamp": time.time(),
|
||||||
"status": "timeout",
|
|
||||||
"error": error_msg,
|
|
||||||
"timestamp": time.time(),
|
|
||||||
})
|
})
|
||||||
await self.state.update_execution(
|
await self.state.update_execution(
|
||||||
execution_id,
|
execution_id, state="failed", stepResults=step_results, error=error_msg,
|
||||||
state="failed",
|
|
||||||
stepResults=step_results,
|
|
||||||
error=error_msg,
|
|
||||||
)
|
)
|
||||||
await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}")
|
await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}")
|
||||||
return
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "step_failed", step_name=step_name, status="timeout", message=error_msg,
|
||||||
|
))
|
||||||
|
return _STEP_FAILED
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
error_msg = f"Step {step_name} failed: {exc}"
|
error_msg = f"Step {step_name} failed: {exc}"
|
||||||
logger.error("Pipeline %s: %s", pipeline_name, error_msg, exc_info=True)
|
logger.error("Pipeline %s: %s", pipeline_name, error_msg, exc_info=True)
|
||||||
|
sentry_sdk.capture_exception(exc)
|
||||||
step_results.append({
|
step_results.append({
|
||||||
"name": step_name,
|
"name": step_name, "output": None, "status": "error",
|
||||||
"output": None,
|
"error": str(exc), "timestamp": time.time(),
|
||||||
"status": "error",
|
|
||||||
"error": str(exc),
|
|
||||||
"timestamp": time.time(),
|
|
||||||
})
|
})
|
||||||
await self.state.update_execution(
|
await self.state.update_execution(
|
||||||
execution_id,
|
execution_id, state="failed", stepResults=step_results, error=error_msg,
|
||||||
state="failed",
|
|
||||||
stepResults=step_results,
|
|
||||||
error=error_msg,
|
|
||||||
)
|
)
|
||||||
await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}")
|
await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}")
|
||||||
return
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "step_failed", step_name=step_name, status="error", message=str(exc),
|
||||||
|
))
|
||||||
|
return _STEP_FAILED
|
||||||
|
|
||||||
# All steps completed
|
# Retries exhausted on transient failure
|
||||||
|
error_msg = f"Step {step_name} failed after {max_retries + 1} attempts: {last_exc}"
|
||||||
|
logger.error("Pipeline %s: %s", pipeline_name, error_msg)
|
||||||
|
sentry_sdk.capture_exception(last_exc)
|
||||||
|
step_results.append({
|
||||||
|
"name": step_name, "output": None, "status": "error",
|
||||||
|
"error": error_msg, "timestamp": time.time(), "retries": max_retries,
|
||||||
|
})
|
||||||
await self.state.update_execution(
|
await self.state.update_execution(
|
||||||
execution_id,
|
execution_id, state="failed", stepResults=step_results, error=error_msg,
|
||||||
state="complete",
|
|
||||||
stepResults=step_results,
|
|
||||||
)
|
|
||||||
logger.info("Pipeline %s completed successfully", pipeline_name)
|
|
||||||
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error("Pipeline %s failed unexpectedly: %s", pipeline_name, exc, exc_info=True)
|
|
||||||
await self.state.update_execution(
|
|
||||||
execution_id,
|
|
||||||
state="failed",
|
|
||||||
stepResults=step_results,
|
|
||||||
error=str(exc),
|
|
||||||
)
|
)
|
||||||
|
await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}")
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "step_failed", step_name=step_name, status="error", message=error_msg,
|
||||||
|
))
|
||||||
|
return _STEP_FAILED
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _md_to_html(text: str) -> str:
|
def _md_to_html(text: str) -> str:
|
||||||
@@ -275,10 +379,21 @@ class PipelineEngine:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
result = await asyncio.wait_for(future, timeout=timeout_s)
|
result = await asyncio.wait_for(future, timeout=timeout_s)
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "approval_resolved", message=f"Response: {result}",
|
||||||
|
))
|
||||||
return result # "approve" or "decline"
|
return result # "approve" or "decline"
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
sentry_sdk.capture_message(
|
||||||
|
f"Approval timeout: execution={execution_id}",
|
||||||
|
level="warning",
|
||||||
|
)
|
||||||
await self.send_text(target_room, "Approval timed out. Pipeline aborted.")
|
await self.send_text(target_room, "Approval timed out. Pipeline aborted.")
|
||||||
await self.state.update_execution(execution_id, state="aborted", error="Approval timed out")
|
await self.state.update_execution(execution_id, state="aborted", error="Approval timed out")
|
||||||
|
asyncio.create_task(self.state.log_event(
|
||||||
|
execution_id, "step_failed", step_name="approval", status="timeout",
|
||||||
|
message="Approval timed out",
|
||||||
|
))
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
self._approval_futures.pop(execution_id, None)
|
self._approval_futures.pop(execution_id, None)
|
||||||
|
|||||||
@@ -60,3 +60,43 @@ class PipelineStateManager:
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Failed to fetch pending approvals", exc_info=True)
|
logger.debug("Failed to fetch pending approvals", exc_info=True)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
async def count_active_executions(self, user_id: str) -> int:
|
||||||
|
"""Count running/waiting_approval executions for a user."""
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
resp = await client.get(
|
||||||
|
f"{self.portal_url}/api/pipelines/executions/active",
|
||||||
|
headers={"x-api-key": self.api_key},
|
||||||
|
params={"userId": user_id},
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
return data.get("count", 0)
|
||||||
|
except Exception:
|
||||||
|
logger.warning("Failed to count active executions for user %s", user_id, exc_info=True)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
async def log_event(
|
||||||
|
self, execution_id: str, action: str, *,
|
||||||
|
step_name: str | None = None,
|
||||||
|
status: str | None = None,
|
||||||
|
message: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Log an audit event for a pipeline execution (fire-and-forget)."""
|
||||||
|
try:
|
||||||
|
payload = {"action": action}
|
||||||
|
if step_name:
|
||||||
|
payload["stepName"] = step_name
|
||||||
|
if status:
|
||||||
|
payload["status"] = status
|
||||||
|
if message:
|
||||||
|
payload["message"] = message
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
await client.post(
|
||||||
|
f"{self.portal_url}/api/pipelines/executions/{execution_id}/audit-log",
|
||||||
|
headers={"x-api-key": self.api_key},
|
||||||
|
json=payload,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Failed to log audit event %s for %s", action, execution_id)
|
||||||
|
|||||||
Reference in New Issue
Block a user