"""Pipeline execution engine — runs steps sequentially with output chaining.""" import asyncio import logging import re import time from datetime import datetime, timezone, timedelta import httpx import sentry_sdk from .state import PipelineStateManager from .steps import execute_step logger = logging.getLogger(__name__) # Transient errors eligible for retry TRANSIENT_EXCEPTIONS = ( httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout, ConnectionError, OSError, ) # Sentinel for failed step (distinct from None output) _STEP_FAILED = object() class PipelineEngine: """Executes pipeline steps sequentially, managing state and output chaining.""" def __init__( self, state: PipelineStateManager, send_text, matrix_client, llm_client=None, default_model: str = "claude-haiku", escalation_model: str = "claude-sonnet", on_approval_registered=None, ): self.state = state self.send_text = send_text self.matrix_client = matrix_client self.llm = llm_client self.default_model = default_model self.escalation_model = escalation_model self.on_approval_registered = on_approval_registered # callback(event_id, execution_id) # Track active approval listeners: execution_id -> asyncio.Future self._approval_futures: dict[str, asyncio.Future] = {} def render_template(self, template: str, context: dict) -> str: """Simple Jinja2-like template rendering: {{ step_name.output }}""" def replacer(match): expr = match.group(1).strip() parts = expr.split(".") try: value = context for part in parts: if isinstance(value, dict): value = value[part] else: return match.group(0) return str(value) except (KeyError, TypeError): return match.group(0) return re.sub(r"\{\{\s*(.+?)\s*\}\}", replacer, template) def evaluate_condition(self, condition: str, context: dict) -> bool: """Evaluate a simple condition like {{ step.response == 'approve' }}""" rendered = self.render_template(condition, context) # Strip template markers if still present rendered = rendered.strip().strip("{}").strip() # Simple equality check if "==" in rendered: left, right = rendered.split("==", 1) return left.strip().strip("'\"") == right.strip().strip("'\"") if "!=" in rendered: left, right = rendered.split("!=", 1) return left.strip().strip("'\"") != right.strip().strip("'\"") # Truthy check return bool(rendered) and rendered.lower() not in ("false", "none", "0", "") async def run(self, pipeline: dict, trigger_data: dict | None = None) -> None: """Execute a full pipeline run.""" pipeline_id = pipeline["id"] pipeline_name = pipeline["name"] target_room = pipeline["targetRoom"] steps = pipeline.get("steps", []) user_id = pipeline.get("userId", "") max_retries = pipeline.get("maxRetries", 0) if not steps: logger.warning("Pipeline %s has no steps", pipeline_name) return with sentry_sdk.start_transaction(op="pipeline.execute", name=pipeline_name) as txn: txn.set_tag("pipeline_id", pipeline_id) txn.set_tag("user_id", user_id) txn.set_tag("trigger_type", pipeline.get("triggerType", "unknown")) txn.set_tag("step_count", len(steps)) # Create execution record execution = await self.state.create_execution(pipeline_id, trigger_data) execution_id = execution["id"] txn.set_tag("execution_id", execution_id) # Audit: execution started asyncio.create_task(self.state.log_event( execution_id, "execution_started", message=f"Trigger: {pipeline.get('triggerType', 'manual')}" )) context: dict[str, dict] = {} if trigger_data: context["trigger"] = trigger_data step_results: list[dict] = [] try: for i, step in enumerate(steps): step_name = step.get("name", f"step_{i}") step_type = step.get("type", "") # Resume support: skip already-completed steps if i < len(step_results) and step_results[i].get("status") in ("success", "skipped"): context[step_name] = { "output": step_results[i].get("output", ""), "status": step_results[i]["status"], } continue # Evaluate condition condition = step.get("if") if condition and not self.evaluate_condition(condition, context): logger.info("Pipeline %s: skipping step %s (condition not met)", pipeline_name, step_name) result = {"name": step_name, "output": "skipped", "status": "skipped", "timestamp": time.time()} step_results.append(result) context[step_name] = {"output": "skipped", "status": "skipped"} continue # Update execution state await self.state.update_execution( execution_id, currentStep=i, stepResults=step_results, state="running", ) logger.info("Pipeline %s: executing step %s (%s)", pipeline_name, step_name, step_type) # Render templates in step config rendered_step = {} for key, value in step.items(): if isinstance(value, str): rendered_step[key] = self.render_template(value, context) elif isinstance(value, dict): rendered_step[key] = { k: self.render_template(v, context) if isinstance(v, str) else v for k, v in value.items() } else: rendered_step[key] = value # Execute step with retry logic output = await self._execute_step_with_retry( step_type=step_type, step_name=step_name, rendered_step=rendered_step, context=context, target_room=target_room, execution_id=execution_id, timeout_s=step.get("timeout_s", 60), max_retries=max_retries, step_results=step_results, pipeline_name=pipeline_name, ) if output is _STEP_FAILED: return # step handler already updated state result = { "name": step_name, "output": output, "status": "success", "timestamp": time.time(), } step_results.append(result) context[step_name] = {"output": output, "status": "success"} # For approval steps, also store the response field if step_type == "approval": context[step_name]["response"] = output # Audit: step completed asyncio.create_task(self.state.log_event( execution_id, "step_completed", step_name=step_name, status="success" )) # All steps completed await self.state.update_execution( execution_id, state="complete", stepResults=step_results, ) logger.info("Pipeline %s completed successfully", pipeline_name) # Audit: execution completed asyncio.create_task(self.state.log_event( execution_id, "execution_completed", message=f"{len(steps)} steps completed", )) except Exception as exc: logger.error("Pipeline %s failed unexpectedly: %s", pipeline_name, exc, exc_info=True) sentry_sdk.capture_exception(exc) await self.state.update_execution( execution_id, state="failed", stepResults=step_results, error=str(exc), ) asyncio.create_task(self.state.log_event( execution_id, "execution_failed", message=str(exc) )) async def _execute_step_with_retry( self, *, step_type, step_name, rendered_step, context, target_room, execution_id, timeout_s, max_retries, step_results, pipeline_name, ): """Execute a step with retry on transient failures. Returns output or _STEP_FAILED.""" last_exc = None for attempt in range(max_retries + 1): try: if step_type == "approval": output = await self._execute_approval_step( rendered_step, target_room, execution_id, timeout_s ) else: output = await asyncio.wait_for( execute_step( step_type=step_type, step_config=rendered_step, context=context, send_text=self.send_text, target_room=target_room, llm=self.llm, default_model=self.default_model, escalation_model=self.escalation_model, ), timeout=timeout_s, ) return output except TRANSIENT_EXCEPTIONS as exc: last_exc = exc if attempt < max_retries: backoff = 2 ** attempt # 1s, 2s, 4s logger.warning( "Pipeline %s: step %s transient failure (attempt %d/%d), retrying in %ds: %s", pipeline_name, step_name, attempt + 1, max_retries + 1, backoff, exc, ) asyncio.create_task(self.state.log_event( execution_id, "retry_attempted", step_name=step_name, message=f"Attempt {attempt + 1}/{max_retries + 1}: {exc}", )) await asyncio.sleep(backoff) continue # Exhausted retries — fall through to failure except asyncio.TimeoutError: error_msg = f"Step {step_name} timed out after {timeout_s}s" logger.error("Pipeline %s: %s", pipeline_name, error_msg) sentry_sdk.capture_message(error_msg, level="error") step_results.append({ "name": step_name, "output": None, "status": "timeout", "error": error_msg, "timestamp": time.time(), }) await self.state.update_execution( execution_id, state="failed", stepResults=step_results, error=error_msg, ) await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") asyncio.create_task(self.state.log_event( execution_id, "step_failed", step_name=step_name, status="timeout", message=error_msg, )) return _STEP_FAILED except Exception as exc: error_msg = f"Step {step_name} failed: {exc}" logger.error("Pipeline %s: %s", pipeline_name, error_msg, exc_info=True) sentry_sdk.capture_exception(exc) step_results.append({ "name": step_name, "output": None, "status": "error", "error": str(exc), "timestamp": time.time(), }) await self.state.update_execution( execution_id, state="failed", stepResults=step_results, error=error_msg, ) await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") asyncio.create_task(self.state.log_event( execution_id, "step_failed", step_name=step_name, status="error", message=str(exc), )) return _STEP_FAILED # Retries exhausted on transient failure error_msg = f"Step {step_name} failed after {max_retries + 1} attempts: {last_exc}" logger.error("Pipeline %s: %s", pipeline_name, error_msg) sentry_sdk.capture_exception(last_exc) step_results.append({ "name": step_name, "output": None, "status": "error", "error": error_msg, "timestamp": time.time(), "retries": max_retries, }) await self.state.update_execution( execution_id, state="failed", stepResults=step_results, error=error_msg, ) await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") asyncio.create_task(self.state.log_event( execution_id, "step_failed", step_name=step_name, status="error", message=error_msg, )) return _STEP_FAILED @staticmethod def _md_to_html(text: str) -> str: """Convert basic markdown to HTML for Matrix formatted_body.""" import re as _re html = text # Bold: **text** -> text html = _re.sub(r'\*\*(.+?)\*\*', r'\1', html) # Italic: *text* -> text html = _re.sub(r'(?\1', html) # Code: `text` -> text html = _re.sub(r'`(.+?)`', r'\1', html) # Newlines html = html.replace("\n", "
") return html async def _execute_approval_step( self, step: dict, target_room: str, execution_id: str, timeout_s: int ) -> str: """Post approval message and wait for reaction.""" message = step.get("message", "Approve this action?") body = f"**Approval Required**\n\n{message}\n\nReact with \U0001f44d to approve or \U0001f44e to decline." html = self._md_to_html(body) # Send message and get event ID resp = await self.matrix_client.room_send( room_id=target_room, message_type="m.room.message", content={ "msgtype": "m.text", "body": body, "format": "org.matrix.custom.html", "formatted_body": html, }, ) event_id = resp.event_id if hasattr(resp, "event_id") else None if not event_id: raise RuntimeError("Failed to send approval message") # Notify bot of approval event mapping if self.on_approval_registered: self.on_approval_registered(event_id, execution_id) # Register approval listener future = asyncio.get_event_loop().create_future() self._approval_futures[execution_id] = future # Update execution with approval tracking expires_at = (datetime.now(timezone.utc) + timedelta(seconds=timeout_s)).isoformat() await self.state.update_execution( execution_id, state="waiting_approval", approvalMsgId=event_id, approvalExpiresAt=expires_at, ) try: result = await asyncio.wait_for(future, timeout=timeout_s) asyncio.create_task(self.state.log_event( execution_id, "approval_resolved", message=f"Response: {result}", )) return result # "approve" or "decline" except asyncio.TimeoutError: sentry_sdk.capture_message( f"Approval timeout: execution={execution_id}", level="warning", ) await self.send_text(target_room, "Approval timed out. Pipeline aborted.") await self.state.update_execution(execution_id, state="aborted", error="Approval timed out") asyncio.create_task(self.state.log_event( execution_id, "step_failed", step_name="approval", status="timeout", message="Approval timed out", )) raise finally: self._approval_futures.pop(execution_id, None) def resolve_approval(self, execution_id: str, response: str) -> bool: """Resolve a pending approval. Called by reaction handler.""" future = self._approval_futures.get(execution_id) if future and not future.done(): future.set_result(response) return True return False