Matrix needs formatted_body as HTML, not raw markdown. Added _md_to_html for bold/italic/code conversion. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
293 lines
12 KiB
Python
293 lines
12 KiB
Python
"""Pipeline execution engine — runs steps sequentially with output chaining."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import time
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
from .state import PipelineStateManager
|
|
from .steps import execute_step
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PipelineEngine:
|
|
"""Executes pipeline steps sequentially, managing state and output chaining."""
|
|
|
|
def __init__(
|
|
self,
|
|
state: PipelineStateManager,
|
|
send_text,
|
|
matrix_client,
|
|
llm_client=None,
|
|
default_model: str = "claude-haiku",
|
|
escalation_model: str = "claude-sonnet",
|
|
on_approval_registered=None,
|
|
):
|
|
self.state = state
|
|
self.send_text = send_text
|
|
self.matrix_client = matrix_client
|
|
self.llm = llm_client
|
|
self.default_model = default_model
|
|
self.escalation_model = escalation_model
|
|
self.on_approval_registered = on_approval_registered # callback(event_id, execution_id)
|
|
# Track active approval listeners: execution_id -> asyncio.Future
|
|
self._approval_futures: dict[str, asyncio.Future] = {}
|
|
|
|
def render_template(self, template: str, context: dict) -> str:
|
|
"""Simple Jinja2-like template rendering: {{ step_name.output }}"""
|
|
def replacer(match):
|
|
expr = match.group(1).strip()
|
|
parts = expr.split(".")
|
|
try:
|
|
value = context
|
|
for part in parts:
|
|
if isinstance(value, dict):
|
|
value = value[part]
|
|
else:
|
|
return match.group(0)
|
|
return str(value)
|
|
except (KeyError, TypeError):
|
|
return match.group(0)
|
|
|
|
return re.sub(r"\{\{\s*(.+?)\s*\}\}", replacer, template)
|
|
|
|
def evaluate_condition(self, condition: str, context: dict) -> bool:
|
|
"""Evaluate a simple condition like {{ step.response == 'approve' }}"""
|
|
rendered = self.render_template(condition, context)
|
|
# Strip template markers if still present
|
|
rendered = rendered.strip().strip("{}").strip()
|
|
# Simple equality check
|
|
if "==" in rendered:
|
|
left, right = rendered.split("==", 1)
|
|
return left.strip().strip("'\"") == right.strip().strip("'\"")
|
|
if "!=" in rendered:
|
|
left, right = rendered.split("!=", 1)
|
|
return left.strip().strip("'\"") != right.strip().strip("'\"")
|
|
# Truthy check
|
|
return bool(rendered) and rendered.lower() not in ("false", "none", "0", "")
|
|
|
|
async def run(self, pipeline: dict, trigger_data: dict | None = None) -> None:
|
|
"""Execute a full pipeline run."""
|
|
pipeline_id = pipeline["id"]
|
|
pipeline_name = pipeline["name"]
|
|
target_room = pipeline["targetRoom"]
|
|
steps = pipeline.get("steps", [])
|
|
|
|
if not steps:
|
|
logger.warning("Pipeline %s has no steps", pipeline_name)
|
|
return
|
|
|
|
# Create execution record
|
|
execution = await self.state.create_execution(pipeline_id, trigger_data)
|
|
execution_id = execution["id"]
|
|
|
|
context: dict[str, dict] = {}
|
|
if trigger_data:
|
|
context["trigger"] = trigger_data
|
|
|
|
step_results: list[dict] = []
|
|
|
|
try:
|
|
for i, step in enumerate(steps):
|
|
step_name = step.get("name", f"step_{i}")
|
|
step_type = step.get("type", "")
|
|
|
|
# Evaluate condition
|
|
condition = step.get("if")
|
|
if condition and not self.evaluate_condition(condition, context):
|
|
logger.info("Pipeline %s: skipping step %s (condition not met)", pipeline_name, step_name)
|
|
result = {"name": step_name, "output": "skipped", "status": "skipped", "timestamp": time.time()}
|
|
step_results.append(result)
|
|
context[step_name] = {"output": "skipped", "status": "skipped"}
|
|
continue
|
|
|
|
# Update execution state
|
|
await self.state.update_execution(
|
|
execution_id,
|
|
currentStep=i,
|
|
stepResults=step_results,
|
|
state="running",
|
|
)
|
|
|
|
logger.info("Pipeline %s: executing step %s (%s)", pipeline_name, step_name, step_type)
|
|
|
|
# Render templates in step config
|
|
rendered_step = {}
|
|
for key, value in step.items():
|
|
if isinstance(value, str):
|
|
rendered_step[key] = self.render_template(value, context)
|
|
elif isinstance(value, dict):
|
|
rendered_step[key] = {
|
|
k: self.render_template(v, context) if isinstance(v, str) else v
|
|
for k, v in value.items()
|
|
}
|
|
else:
|
|
rendered_step[key] = value
|
|
|
|
# Execute step
|
|
try:
|
|
timeout_s = step.get("timeout_s", 60)
|
|
|
|
if step_type == "approval":
|
|
output = await self._execute_approval_step(
|
|
rendered_step, target_room, execution_id, timeout_s
|
|
)
|
|
else:
|
|
output = await asyncio.wait_for(
|
|
execute_step(
|
|
step_type=step_type,
|
|
step_config=rendered_step,
|
|
context=context,
|
|
send_text=self.send_text,
|
|
target_room=target_room,
|
|
llm=self.llm,
|
|
default_model=self.default_model,
|
|
escalation_model=self.escalation_model,
|
|
),
|
|
timeout=timeout_s,
|
|
)
|
|
|
|
result = {
|
|
"name": step_name,
|
|
"output": output,
|
|
"status": "success",
|
|
"timestamp": time.time(),
|
|
}
|
|
step_results.append(result)
|
|
context[step_name] = {"output": output, "status": "success"}
|
|
|
|
# For approval steps, also store the response field
|
|
if step_type == "approval":
|
|
context[step_name]["response"] = output
|
|
|
|
except asyncio.TimeoutError:
|
|
error_msg = f"Step {step_name} timed out after {step.get('timeout_s', 60)}s"
|
|
logger.error("Pipeline %s: %s", pipeline_name, error_msg)
|
|
step_results.append({
|
|
"name": step_name,
|
|
"output": None,
|
|
"status": "timeout",
|
|
"error": error_msg,
|
|
"timestamp": time.time(),
|
|
})
|
|
await self.state.update_execution(
|
|
execution_id,
|
|
state="failed",
|
|
stepResults=step_results,
|
|
error=error_msg,
|
|
)
|
|
await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}")
|
|
return
|
|
|
|
except Exception as exc:
|
|
error_msg = f"Step {step_name} failed: {exc}"
|
|
logger.error("Pipeline %s: %s", pipeline_name, error_msg, exc_info=True)
|
|
step_results.append({
|
|
"name": step_name,
|
|
"output": None,
|
|
"status": "error",
|
|
"error": str(exc),
|
|
"timestamp": time.time(),
|
|
})
|
|
await self.state.update_execution(
|
|
execution_id,
|
|
state="failed",
|
|
stepResults=step_results,
|
|
error=error_msg,
|
|
)
|
|
await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}")
|
|
return
|
|
|
|
# All steps completed
|
|
await self.state.update_execution(
|
|
execution_id,
|
|
state="complete",
|
|
stepResults=step_results,
|
|
)
|
|
logger.info("Pipeline %s completed successfully", pipeline_name)
|
|
|
|
except Exception as exc:
|
|
logger.error("Pipeline %s failed unexpectedly: %s", pipeline_name, exc, exc_info=True)
|
|
await self.state.update_execution(
|
|
execution_id,
|
|
state="failed",
|
|
stepResults=step_results,
|
|
error=str(exc),
|
|
)
|
|
|
|
@staticmethod
|
|
def _md_to_html(text: str) -> str:
|
|
"""Convert basic markdown to HTML for Matrix formatted_body."""
|
|
import re as _re
|
|
html = text
|
|
# Bold: **text** -> <strong>text</strong>
|
|
html = _re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', html)
|
|
# Italic: *text* -> <em>text</em>
|
|
html = _re.sub(r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)', r'<em>\1</em>', html)
|
|
# Code: `text` -> <code>text</code>
|
|
html = _re.sub(r'`(.+?)`', r'<code>\1</code>', html)
|
|
# Newlines
|
|
html = html.replace("\n", "<br>")
|
|
return html
|
|
|
|
async def _execute_approval_step(
|
|
self, step: dict, target_room: str, execution_id: str, timeout_s: int
|
|
) -> str:
|
|
"""Post approval message and wait for reaction."""
|
|
message = step.get("message", "Approve this action?")
|
|
body = f"**Approval Required**\n\n{message}\n\nReact with \U0001f44d to approve or \U0001f44e to decline."
|
|
html = self._md_to_html(body)
|
|
|
|
# Send message and get event ID
|
|
resp = await self.matrix_client.room_send(
|
|
room_id=target_room,
|
|
message_type="m.room.message",
|
|
content={
|
|
"msgtype": "m.text",
|
|
"body": body,
|
|
"format": "org.matrix.custom.html",
|
|
"formatted_body": html,
|
|
},
|
|
)
|
|
event_id = resp.event_id if hasattr(resp, "event_id") else None
|
|
|
|
if not event_id:
|
|
raise RuntimeError("Failed to send approval message")
|
|
|
|
# Notify bot of approval event mapping
|
|
if self.on_approval_registered:
|
|
self.on_approval_registered(event_id, execution_id)
|
|
|
|
# Register approval listener
|
|
future = asyncio.get_event_loop().create_future()
|
|
self._approval_futures[execution_id] = future
|
|
|
|
# Update execution with approval tracking
|
|
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=timeout_s)).isoformat()
|
|
await self.state.update_execution(
|
|
execution_id,
|
|
state="waiting_approval",
|
|
approvalMsgId=event_id,
|
|
approvalExpiresAt=expires_at,
|
|
)
|
|
|
|
try:
|
|
result = await asyncio.wait_for(future, timeout=timeout_s)
|
|
return result # "approve" or "decline"
|
|
except asyncio.TimeoutError:
|
|
await self.send_text(target_room, "Approval timed out. Pipeline aborted.")
|
|
await self.state.update_execution(execution_id, state="aborted", error="Approval timed out")
|
|
raise
|
|
finally:
|
|
self._approval_futures.pop(execution_id, None)
|
|
|
|
def resolve_approval(self, execution_id: str, response: str) -> bool:
|
|
"""Resolve a pending approval. Called by reaction handler."""
|
|
future = self._approval_futures.get(execution_id)
|
|
if future and not future.done():
|
|
future.set_result(response)
|
|
return True
|
|
return False
|