- Sentry transactions wrapping pipeline execution with tags - Retry with exponential backoff for transient failures (connect, timeout, 5xx) - Concurrent execution limit (3/user) enforced in scheduler - Audit log events fired at each pipeline lifecycle point - Resume support: skip already-completed steps on restart Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
103 lines
3.9 KiB
Python
103 lines
3.9 KiB
Python
"""Pipeline state management — syncs with matrixhost portal API."""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PipelineStateManager:
|
|
"""Manages pipeline state via portal API."""
|
|
|
|
def __init__(self, portal_url: str, api_key: str):
|
|
self.portal_url = portal_url.rstrip("/")
|
|
self.api_key = api_key
|
|
|
|
async def fetch_active_pipelines(self) -> list[dict]:
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
resp = await client.get(
|
|
f"{self.portal_url}/api/pipelines/active",
|
|
headers={"x-api-key": self.api_key},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data.get("pipelines", [])
|
|
|
|
async def create_execution(self, pipeline_id: str, trigger_data: dict | None = None) -> dict:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.post(
|
|
f"{self.portal_url}/api/pipelines/{pipeline_id}/execution",
|
|
headers={"x-api-key": self.api_key},
|
|
json={"triggerData": trigger_data},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data["execution"]
|
|
|
|
async def update_execution(self, execution_id: str, **kwargs) -> None:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
await client.put(
|
|
f"{self.portal_url}/api/pipelines/executions/{execution_id}",
|
|
headers={"x-api-key": self.api_key},
|
|
json=kwargs,
|
|
)
|
|
except Exception:
|
|
logger.warning("Failed to update execution %s", execution_id, exc_info=True)
|
|
|
|
async def fetch_pending_approvals(self) -> list[dict]:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(
|
|
f"{self.portal_url}/api/pipelines/executions/pending",
|
|
headers={"x-api-key": self.api_key},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data.get("executions", [])
|
|
except Exception:
|
|
logger.debug("Failed to fetch pending approvals", exc_info=True)
|
|
return []
|
|
|
|
async def count_active_executions(self, user_id: str) -> int:
|
|
"""Count running/waiting_approval executions for a user."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(
|
|
f"{self.portal_url}/api/pipelines/executions/active",
|
|
headers={"x-api-key": self.api_key},
|
|
params={"userId": user_id},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data.get("count", 0)
|
|
except Exception:
|
|
logger.warning("Failed to count active executions for user %s", user_id, exc_info=True)
|
|
return 0
|
|
|
|
async def log_event(
|
|
self, execution_id: str, action: str, *,
|
|
step_name: str | None = None,
|
|
status: str | None = None,
|
|
message: str | None = None,
|
|
) -> None:
|
|
"""Log an audit event for a pipeline execution (fire-and-forget)."""
|
|
try:
|
|
payload = {"action": action}
|
|
if step_name:
|
|
payload["stepName"] = step_name
|
|
if status:
|
|
payload["status"] = status
|
|
if message:
|
|
payload["message"] = message
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
await client.post(
|
|
f"{self.portal_url}/api/pipelines/executions/{execution_id}/audit-log",
|
|
headers={"x-api-key": self.api_key},
|
|
json=payload,
|
|
)
|
|
except Exception:
|
|
logger.debug("Failed to log audit event %s for %s", action, execution_id)
|