From bd8d96335e41fcc0a2499587aa0b9b98b58a87e2 Mon Sep 17 00:00:00 2001 From: Christian Gick Date: Wed, 18 Mar 2026 17:06:07 +0200 Subject: [PATCH] feat: add pipeline engine with approval flow and file triggers Sequential step executor (script, claude_prompt, approval, api_call, template, skyvern placeholder), reaction-based approvals, file upload trigger matching, portal API state sync. Co-Authored-By: Claude Opus 4.6 (1M context) --- bot.py | 120 ++++++++++++++ cron/scheduler.py | 95 ++++++++++- pipelines/__init__.py | 6 + pipelines/approval.py | 18 ++ pipelines/engine.py | 276 +++++++++++++++++++++++++++++++ pipelines/state.py | 62 +++++++ pipelines/steps/__init__.py | 45 +++++ pipelines/steps/api_call.py | 33 ++++ pipelines/steps/claude_prompt.py | 32 ++++ pipelines/steps/script.py | 27 +++ pipelines/steps/skyvern.py | 24 +++ pipelines/steps/template.py | 18 ++ 12 files changed, 755 insertions(+), 1 deletion(-) create mode 100644 pipelines/__init__.py create mode 100644 pipelines/approval.py create mode 100644 pipelines/engine.py create mode 100644 pipelines/state.py create mode 100644 pipelines/steps/__init__.py create mode 100644 pipelines/steps/api_call.py create mode 100644 pipelines/steps/claude_prompt.py create mode 100644 pipelines/steps/script.py create mode 100644 pipelines/steps/skyvern.py create mode 100644 pipelines/steps/template.py diff --git a/bot.py b/bot.py index bbbb79a..1ac7cee 100644 --- a/bot.py +++ b/bot.py @@ -1180,9 +1180,16 @@ class Bot: api_key=BOT_API_KEY, matrix_client=self.client, send_text_fn=self._send_text, + llm_client=self.llm, + default_model=DEFAULT_MODEL, + escalation_model=ESCALATION_MODEL, ) else: self.cron_scheduler = None + # Pipeline approval tracking: event_id -> execution_id + self._pipeline_approval_events: dict[str, str] = {} + if self.cron_scheduler: + self.cron_scheduler.pipeline_engine.on_approval_registered = self._on_pipeline_approval_registered async def _has_documents(self, matrix_user_id: str) -> bool: """Check if user has documents via local RAG or MatrixHost portal API. @@ -1261,6 +1268,7 @@ class Bot: self.client.add_event_callback(self.on_file_message, RoomMessageFile) self.client.add_event_callback(self.on_encrypted_file_message, RoomEncryptedFile) self.client.add_event_callback(self.on_room_unknown, RoomMessageUnknown) + self.client.add_event_callback(self.on_reaction, UnknownEvent) self.client.add_response_callback(self.on_sync, SyncResponse) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationStart) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationKey) @@ -1456,6 +1464,57 @@ class Bot: self.client.verify_device(device) logger.info("Auto-trusted device %s of %s", device.device_id, user_id) + async def on_reaction(self, room, event: UnknownEvent): + """Handle reaction events for pipeline approval flow.""" + if event.sender == BOT_USER: + return + if not self._sync_token_received: + return + source = event.source or {} + if source.get("type") != "m.reaction": + return + + content = source.get("content", {}) + relates_to = content.get("m.relates_to", {}) + if relates_to.get("rel_type") != "m.annotation": + return + + event_id = relates_to.get("event_id", "") + reaction_key = relates_to.get("key", "") + + # Check if this reaction is for a pipeline approval + if not self.cron_scheduler: + return + + from pipelines.approval import reaction_to_response + response = reaction_to_response(reaction_key) + if not response: + return + + # Look up execution by approval event ID + execution_id = self._pipeline_approval_events.get(event_id) + if execution_id: + resolved = self.cron_scheduler.pipeline_engine.resolve_approval(execution_id, response) + if resolved: + self._pipeline_approval_events.pop(event_id, None) + logger.info("Pipeline approval resolved: %s -> %s", execution_id, response) + return + + # If not in local cache, check pending approvals from portal + try: + pending = await self.cron_scheduler._pipeline_state.fetch_pending_approvals() + for execution in pending: + if execution.get("approvalMsgId") == event_id: + eid = execution["id"] + self._pipeline_approval_events[event_id] = eid + resolved = self.cron_scheduler.pipeline_engine.resolve_approval(eid, response) + if resolved: + self._pipeline_approval_events.pop(event_id, None) + logger.info("Pipeline approval resolved (from portal): %s -> %s", eid, response) + break + except Exception: + logger.debug("Failed to check pending approvals for reaction", exc_info=True) + async def on_unknown(self, room, event: UnknownEvent): """Handle call member state events and in-room verification.""" # Route verification events @@ -2185,6 +2244,63 @@ class Bot: ".txt", ".md", ".csv", ".json", ".xml", ".html", ".yaml", ".yml", ".log", }) + def _on_pipeline_approval_registered(self, event_id: str, execution_id: str): + """Track approval event -> execution mapping for reaction handling.""" + self._pipeline_approval_events[event_id] = execution_id + + async def _check_pipeline_file_trigger(self, room, event, filename: str, mime_type: str): + """Check if an uploaded file matches any pipeline file_upload trigger.""" + pipelines = self.cron_scheduler.get_file_upload_pipelines() + for pipeline in pipelines: + target_room = pipeline.get("targetRoom", "") + # Pipeline must target this room + if target_room != room.room_id: + continue + + # Check mime type match + required_mime = pipeline.get("fileMimetype", "") + if required_mime and not mime_type.startswith(required_mime): + continue + + # Check filename pattern + pattern = pipeline.get("filePattern", "") + if pattern: + try: + if not re.match(pattern, filename): + continue + except re.error: + continue + + # Download file content for trigger data + mxc_url = event.url if hasattr(event, "url") else None + file_text = "" + if mxc_url: + try: + resp = await self.client.download(mxc=mxc_url) + if hasattr(resp, "body"): + file_bytes = resp.body + ext = os.path.splitext(filename.lower())[1] + if ext == ".pdf": + file_text = self._extract_pdf_text(file_bytes) + elif ext == ".docx": + file_text = self._extract_docx_text(file_bytes) + else: + file_text = self._extract_text_file(file_bytes) + if file_text and len(file_text) > 50000: + file_text = file_text[:50000] + except Exception: + logger.debug("Failed to extract file for pipeline trigger", exc_info=True) + + trigger_data = { + "filename": filename, + "mime_type": mime_type, + "file_content": file_text, + "sender": event.sender, + "room_id": room.room_id, + } + logger.info("File upload triggered pipeline: %s (file: %s)", pipeline["name"], filename) + asyncio.create_task(self.cron_scheduler.pipeline_engine.run(pipeline, trigger_data)) + async def on_file_message(self, room, event: RoomMessageFile): """Handle file messages: extract text from PDFs, docx, and text files.""" if event.sender == BOT_USER: @@ -2207,6 +2323,10 @@ class Bot: is_docx = mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or ext == ".docx" is_text = ext in self._TEXT_EXTENSIONS or mime_type.startswith("text/") + # Check for pipeline file_upload triggers (before DM/mention check) + if self.cron_scheduler: + await self._check_pipeline_file_trigger(room, event, filename, mime_type) + if not (is_pdf or is_docx or is_text): return diff --git a/cron/scheduler.py b/cron/scheduler.py index ee28fec..fcb6aae 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -7,6 +7,7 @@ from datetime import datetime, timezone import httpx from .executor import execute_job +from pipelines import PipelineEngine, PipelineStateManager logger = logging.getLogger(__name__) @@ -17,7 +18,9 @@ PENDING_CHECK_INTERVAL = 15 # 15 seconds — fast check for manual triggers class CronScheduler: """Fetches enabled cron jobs from the matrixhost portal and runs them on schedule.""" - def __init__(self, portal_url: str, api_key: str, matrix_client, send_text_fn): + def __init__(self, portal_url: str, api_key: str, matrix_client, send_text_fn, + llm_client=None, default_model: str = "claude-haiku", + escalation_model: str = "claude-sonnet"): self.portal_url = portal_url.rstrip("/") self.api_key = api_key self.matrix_client = matrix_client @@ -26,6 +29,19 @@ class CronScheduler: self._tasks: dict[str, asyncio.Task] = {} # id -> scheduler task self._running = False + # Pipeline engine + self._pipeline_state = PipelineStateManager(portal_url, api_key) + self.pipeline_engine = PipelineEngine( + state=self._pipeline_state, + send_text=send_text_fn, + matrix_client=matrix_client, + llm_client=llm_client, + default_model=default_model, + escalation_model=escalation_model, + ) + self._pipelines: dict[str, dict] = {} # id -> pipeline data + self._pipeline_tasks: dict[str, asyncio.Task] = {} # id -> scheduler task + async def start(self): """Start the scheduler background loops.""" self._running = True @@ -35,6 +51,8 @@ class CronScheduler: await asyncio.gather( self._full_sync_loop(), self._pending_check_loop(), + self._pipeline_sync_loop(), + self._pipeline_pending_check_loop(), ) async def _full_sync_loop(self): @@ -71,11 +89,86 @@ class CronScheduler: logger.info("Pending trigger: %s", job["name"]) asyncio.create_task(self._run_once(job)) + async def _pipeline_sync_loop(self): + """Full pipeline reconciliation every 5 minutes.""" + while self._running: + try: + await self._sync_pipelines() + except Exception: + logger.warning("Pipeline sync failed", exc_info=True) + await asyncio.sleep(SYNC_INTERVAL) + + async def _pipeline_pending_check_loop(self): + """Fast poll for manually triggered pipelines every 15 seconds.""" + while self._running: + try: + await self._check_pending_pipelines() + except Exception: + logger.debug("Pipeline pending check failed", exc_info=True) + await asyncio.sleep(PENDING_CHECK_INTERVAL) + + async def _sync_pipelines(self): + """Fetch active pipelines from portal and reconcile.""" + pipelines = await self._pipeline_state.fetch_active_pipelines() + remote = {p["id"]: p for p in pipelines} + + # Remove pipelines no longer active + for pid in list(self._pipeline_tasks): + if pid not in remote: + logger.info("Removing pipeline %s (no longer active)", pid) + self._pipeline_tasks[pid].cancel() + del self._pipeline_tasks[pid] + self._pipelines.pop(pid, None) + + # Add/update cron-triggered pipelines + for pid, pipeline in remote.items(): + existing = self._pipelines.get(pid) + if existing and existing.get("updatedAt") == pipeline.get("updatedAt"): + continue + + if pid in self._pipeline_tasks: + self._pipeline_tasks[pid].cancel() + + self._pipelines[pid] = pipeline + + if pipeline.get("triggerType") == "cron": + self._pipeline_tasks[pid] = asyncio.create_task( + self._pipeline_cron_loop(pipeline), name=f"pipeline-{pid}" + ) + logger.info("Scheduled pipeline: %s (%s @ %s)", + pipeline["name"], pipeline.get("schedule", ""), pipeline.get("scheduleAt", "")) + + async def _check_pending_pipelines(self): + """Check for pipelines with lastStatus='pending' and run them.""" + pipelines = await self._pipeline_state.fetch_active_pipelines() + for pipeline in pipelines: + if pipeline.get("lastStatus") == "pending": + logger.info("Pending pipeline trigger: %s", pipeline["name"]) + asyncio.create_task(self.pipeline_engine.run(pipeline)) + + async def _pipeline_cron_loop(self, pipeline: dict): + """Run a pipeline on its cron schedule.""" + try: + while True: + sleep_secs = self._seconds_until_next_run(pipeline) + if sleep_secs > 0: + await asyncio.sleep(sleep_secs) + await self.pipeline_engine.run(pipeline) + except asyncio.CancelledError: + pass + + def get_file_upload_pipelines(self) -> list[dict]: + """Return all active file_upload-triggered pipelines.""" + return [p for p in self._pipelines.values() if p.get("triggerType") == "file_upload"] + async def stop(self): self._running = False for task in self._tasks.values(): task.cancel() self._tasks.clear() + for task in self._pipeline_tasks.values(): + task.cancel() + self._pipeline_tasks.clear() async def _sync_jobs(self): """Fetch active jobs from portal and reconcile with running tasks.""" diff --git a/pipelines/__init__.py b/pipelines/__init__.py new file mode 100644 index 0000000..f64a0cc --- /dev/null +++ b/pipelines/__init__.py @@ -0,0 +1,6 @@ +"""Pipeline orchestration engine for Matrix bot.""" + +from .engine import PipelineEngine +from .state import PipelineStateManager + +__all__ = ["PipelineEngine", "PipelineStateManager"] diff --git a/pipelines/approval.py b/pipelines/approval.py new file mode 100644 index 0000000..b32f33b --- /dev/null +++ b/pipelines/approval.py @@ -0,0 +1,18 @@ +"""Approval handling — maps Matrix reactions to pipeline approvals.""" + +import logging + +logger = logging.getLogger(__name__) + +# Reaction emoji to response mapping +APPROVAL_REACTIONS = { + "\U0001f44d": "approve", # thumbs up + "\U0001f44e": "decline", # thumbs down + "\u2705": "approve", # check mark + "\u274c": "decline", # cross mark +} + + +def reaction_to_response(reaction_key: str) -> str | None: + """Map a reaction emoji to an approval response.""" + return APPROVAL_REACTIONS.get(reaction_key) diff --git a/pipelines/engine.py b/pipelines/engine.py new file mode 100644 index 0000000..06833d8 --- /dev/null +++ b/pipelines/engine.py @@ -0,0 +1,276 @@ +"""Pipeline execution engine — runs steps sequentially with output chaining.""" + +import asyncio +import logging +import re +import time +from datetime import datetime, timezone, timedelta + +from .state import PipelineStateManager +from .steps import execute_step + +logger = logging.getLogger(__name__) + + +class PipelineEngine: + """Executes pipeline steps sequentially, managing state and output chaining.""" + + def __init__( + self, + state: PipelineStateManager, + send_text, + matrix_client, + llm_client=None, + default_model: str = "claude-haiku", + escalation_model: str = "claude-sonnet", + on_approval_registered=None, + ): + self.state = state + self.send_text = send_text + self.matrix_client = matrix_client + self.llm = llm_client + self.default_model = default_model + self.escalation_model = escalation_model + self.on_approval_registered = on_approval_registered # callback(event_id, execution_id) + # Track active approval listeners: execution_id -> asyncio.Future + self._approval_futures: dict[str, asyncio.Future] = {} + + def render_template(self, template: str, context: dict) -> str: + """Simple Jinja2-like template rendering: {{ step_name.output }}""" + def replacer(match): + expr = match.group(1).strip() + parts = expr.split(".") + try: + value = context + for part in parts: + if isinstance(value, dict): + value = value[part] + else: + return match.group(0) + return str(value) + except (KeyError, TypeError): + return match.group(0) + + return re.sub(r"\{\{\s*(.+?)\s*\}\}", replacer, template) + + def evaluate_condition(self, condition: str, context: dict) -> bool: + """Evaluate a simple condition like {{ step.response == 'approve' }}""" + rendered = self.render_template(condition, context) + # Strip template markers if still present + rendered = rendered.strip().strip("{}").strip() + # Simple equality check + if "==" in rendered: + left, right = rendered.split("==", 1) + return left.strip().strip("'\"") == right.strip().strip("'\"") + if "!=" in rendered: + left, right = rendered.split("!=", 1) + return left.strip().strip("'\"") != right.strip().strip("'\"") + # Truthy check + return bool(rendered) and rendered.lower() not in ("false", "none", "0", "") + + async def run(self, pipeline: dict, trigger_data: dict | None = None) -> None: + """Execute a full pipeline run.""" + pipeline_id = pipeline["id"] + pipeline_name = pipeline["name"] + target_room = pipeline["targetRoom"] + steps = pipeline.get("steps", []) + + if not steps: + logger.warning("Pipeline %s has no steps", pipeline_name) + return + + # Create execution record + execution = await self.state.create_execution(pipeline_id, trigger_data) + execution_id = execution["id"] + + context: dict[str, dict] = {} + if trigger_data: + context["trigger"] = trigger_data + + step_results: list[dict] = [] + + try: + for i, step in enumerate(steps): + step_name = step.get("name", f"step_{i}") + step_type = step.get("type", "") + + # Evaluate condition + condition = step.get("if") + if condition and not self.evaluate_condition(condition, context): + logger.info("Pipeline %s: skipping step %s (condition not met)", pipeline_name, step_name) + result = {"name": step_name, "output": "skipped", "status": "skipped", "timestamp": time.time()} + step_results.append(result) + context[step_name] = {"output": "skipped", "status": "skipped"} + continue + + # Update execution state + await self.state.update_execution( + execution_id, + currentStep=i, + stepResults=step_results, + state="running", + ) + + logger.info("Pipeline %s: executing step %s (%s)", pipeline_name, step_name, step_type) + + # Render templates in step config + rendered_step = {} + for key, value in step.items(): + if isinstance(value, str): + rendered_step[key] = self.render_template(value, context) + elif isinstance(value, dict): + rendered_step[key] = { + k: self.render_template(v, context) if isinstance(v, str) else v + for k, v in value.items() + } + else: + rendered_step[key] = value + + # Execute step + try: + timeout_s = step.get("timeout_s", 60) + + if step_type == "approval": + output = await self._execute_approval_step( + rendered_step, target_room, execution_id, timeout_s + ) + else: + output = await asyncio.wait_for( + execute_step( + step_type=step_type, + step_config=rendered_step, + context=context, + send_text=self.send_text, + target_room=target_room, + llm=self.llm, + default_model=self.default_model, + escalation_model=self.escalation_model, + ), + timeout=timeout_s, + ) + + result = { + "name": step_name, + "output": output, + "status": "success", + "timestamp": time.time(), + } + step_results.append(result) + context[step_name] = {"output": output, "status": "success"} + + # For approval steps, also store the response field + if step_type == "approval": + context[step_name]["response"] = output + + except asyncio.TimeoutError: + error_msg = f"Step {step_name} timed out after {step.get('timeout_s', 60)}s" + logger.error("Pipeline %s: %s", pipeline_name, error_msg) + step_results.append({ + "name": step_name, + "output": None, + "status": "timeout", + "error": error_msg, + "timestamp": time.time(), + }) + await self.state.update_execution( + execution_id, + state="failed", + stepResults=step_results, + error=error_msg, + ) + await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") + return + + except Exception as exc: + error_msg = f"Step {step_name} failed: {exc}" + logger.error("Pipeline %s: %s", pipeline_name, error_msg, exc_info=True) + step_results.append({ + "name": step_name, + "output": None, + "status": "error", + "error": str(exc), + "timestamp": time.time(), + }) + await self.state.update_execution( + execution_id, + state="failed", + stepResults=step_results, + error=error_msg, + ) + await self.send_text(target_room, f"**{pipeline_name}**: {error_msg}") + return + + # All steps completed + await self.state.update_execution( + execution_id, + state="complete", + stepResults=step_results, + ) + logger.info("Pipeline %s completed successfully", pipeline_name) + + except Exception as exc: + logger.error("Pipeline %s failed unexpectedly: %s", pipeline_name, exc, exc_info=True) + await self.state.update_execution( + execution_id, + state="failed", + stepResults=step_results, + error=str(exc), + ) + + async def _execute_approval_step( + self, step: dict, target_room: str, execution_id: str, timeout_s: int + ) -> str: + """Post approval message and wait for reaction.""" + message = step.get("message", "Approve this action?") + formatted_msg = f"**Approval Required**\n\n{message}\n\nReact with \U0001f44d to approve or \U0001f44e to decline." + + # Send message and get event ID + resp = await self.matrix_client.room_send( + room_id=target_room, + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": formatted_msg, + "format": "org.matrix.custom.html", + "formatted_body": formatted_msg.replace("\n", "
"), + }, + ) + event_id = resp.event_id if hasattr(resp, "event_id") else None + + if not event_id: + raise RuntimeError("Failed to send approval message") + + # Notify bot of approval event mapping + if self.on_approval_registered: + self.on_approval_registered(event_id, execution_id) + + # Register approval listener + future = asyncio.get_event_loop().create_future() + self._approval_futures[execution_id] = future + + # Update execution with approval tracking + expires_at = (datetime.now(timezone.utc) + timedelta(seconds=timeout_s)).isoformat() + await self.state.update_execution( + execution_id, + state="waiting_approval", + approvalMsgId=event_id, + approvalExpiresAt=expires_at, + ) + + try: + result = await asyncio.wait_for(future, timeout=timeout_s) + return result # "approve" or "decline" + except asyncio.TimeoutError: + await self.send_text(target_room, "Approval timed out. Pipeline aborted.") + await self.state.update_execution(execution_id, state="aborted", error="Approval timed out") + raise + finally: + self._approval_futures.pop(execution_id, None) + + def resolve_approval(self, execution_id: str, response: str) -> bool: + """Resolve a pending approval. Called by reaction handler.""" + future = self._approval_futures.get(execution_id) + if future and not future.done(): + future.set_result(response) + return True + return False diff --git a/pipelines/state.py b/pipelines/state.py new file mode 100644 index 0000000..4f9ccb6 --- /dev/null +++ b/pipelines/state.py @@ -0,0 +1,62 @@ +"""Pipeline state management — syncs with matrixhost portal API.""" + +import logging +from datetime import datetime, timezone + +import httpx + +logger = logging.getLogger(__name__) + + +class PipelineStateManager: + """Manages pipeline state via portal API.""" + + def __init__(self, portal_url: str, api_key: str): + self.portal_url = portal_url.rstrip("/") + self.api_key = api_key + + async def fetch_active_pipelines(self) -> list[dict]: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + f"{self.portal_url}/api/pipelines/active", + headers={"x-api-key": self.api_key}, + ) + resp.raise_for_status() + data = resp.json() + return data.get("pipelines", []) + + async def create_execution(self, pipeline_id: str, trigger_data: dict | None = None) -> dict: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post( + f"{self.portal_url}/api/pipelines/{pipeline_id}/execution", + headers={"x-api-key": self.api_key}, + json={"triggerData": trigger_data}, + ) + resp.raise_for_status() + data = resp.json() + return data["execution"] + + async def update_execution(self, execution_id: str, **kwargs) -> None: + try: + async with httpx.AsyncClient(timeout=10.0) as client: + await client.put( + f"{self.portal_url}/api/pipelines/executions/{execution_id}", + headers={"x-api-key": self.api_key}, + json=kwargs, + ) + except Exception: + logger.warning("Failed to update execution %s", execution_id, exc_info=True) + + async def fetch_pending_approvals(self) -> list[dict]: + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get( + f"{self.portal_url}/api/pipelines/executions/pending", + headers={"x-api-key": self.api_key}, + ) + resp.raise_for_status() + data = resp.json() + return data.get("executions", []) + except Exception: + logger.debug("Failed to fetch pending approvals", exc_info=True) + return [] diff --git a/pipelines/steps/__init__.py b/pipelines/steps/__init__.py new file mode 100644 index 0000000..609b7c3 --- /dev/null +++ b/pipelines/steps/__init__.py @@ -0,0 +1,45 @@ +"""Step type registry and dispatcher.""" + +import logging + +from .script import execute_script +from .claude_prompt import execute_claude_prompt +from .template import execute_template +from .api_call import execute_api_call +from .skyvern import execute_skyvern + +logger = logging.getLogger(__name__) + +STEP_EXECUTORS = { + "script": execute_script, + "claude_prompt": execute_claude_prompt, + "template": execute_template, + "api_call": execute_api_call, + "skyvern": execute_skyvern, +} + + +async def execute_step( + step_type: str, + step_config: dict, + context: dict, + send_text, + target_room: str, + llm=None, + default_model: str = "claude-haiku", + escalation_model: str = "claude-sonnet", +) -> str: + """Execute a pipeline step and return its output as a string.""" + executor = STEP_EXECUTORS.get(step_type) + if not executor: + raise ValueError(f"Unknown step type: {step_type}") + + return await executor( + config=step_config, + context=context, + send_text=send_text, + target_room=target_room, + llm=llm, + default_model=default_model, + escalation_model=escalation_model, + ) diff --git a/pipelines/steps/api_call.py b/pipelines/steps/api_call.py new file mode 100644 index 0000000..dec4103 --- /dev/null +++ b/pipelines/steps/api_call.py @@ -0,0 +1,33 @@ +"""API call step — make HTTP requests.""" + +import logging + +import httpx + +logger = logging.getLogger(__name__) + + +async def execute_api_call(config: dict, **_kwargs) -> str: + """Make an HTTP request and return the response body.""" + url = config.get("url", "") + if not url: + raise ValueError("api_call step requires 'url' field") + + method = config.get("method", "GET").upper() + headers = config.get("headers", {}) + body = config.get("body") + + async with httpx.AsyncClient(timeout=30.0) as client: + if method == "GET": + resp = await client.get(url, headers=headers) + elif method == "POST": + resp = await client.post(url, headers=headers, content=body) + elif method == "PUT": + resp = await client.put(url, headers=headers, content=body) + elif method == "DELETE": + resp = await client.delete(url, headers=headers) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + + resp.raise_for_status() + return resp.text diff --git a/pipelines/steps/claude_prompt.py b/pipelines/steps/claude_prompt.py new file mode 100644 index 0000000..89b1cf2 --- /dev/null +++ b/pipelines/steps/claude_prompt.py @@ -0,0 +1,32 @@ +"""Claude prompt step — call LLM via LiteLLM proxy.""" + +import logging + +logger = logging.getLogger(__name__) + + +async def execute_claude_prompt( + config: dict, + llm=None, + default_model: str = "claude-haiku", + escalation_model: str = "claude-sonnet", + **_kwargs, +) -> str: + """Send a prompt to Claude and return the response.""" + if not llm: + raise RuntimeError("LLM client not configured") + + prompt = config.get("prompt", "") + if not prompt: + raise ValueError("claude_prompt step requires 'prompt' field") + + model_name = config.get("model", "default") + model = escalation_model if model_name == "escalation" else default_model + + response = await llm.chat.completions.create( + model=model, + messages=[{"role": "user", "content": prompt}], + max_tokens=4096, + ) + + return response.choices[0].message.content or "" diff --git a/pipelines/steps/script.py b/pipelines/steps/script.py new file mode 100644 index 0000000..d696c5d --- /dev/null +++ b/pipelines/steps/script.py @@ -0,0 +1,27 @@ +"""Script step — execute a shell command and capture output.""" + +import asyncio +import logging + +logger = logging.getLogger(__name__) + + +async def execute_script(config: dict, **_kwargs) -> str: + """Execute a shell script and return stdout.""" + script = config.get("script", "") + if not script: + raise ValueError("Script step requires 'script' field") + + proc = await asyncio.create_subprocess_shell( + script, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + + output = stdout.decode("utf-8", errors="replace").strip() + if proc.returncode != 0: + err = stderr.decode("utf-8", errors="replace").strip() + raise RuntimeError(f"Script exited with code {proc.returncode}: {err or output}") + + return output diff --git a/pipelines/steps/skyvern.py b/pipelines/steps/skyvern.py new file mode 100644 index 0000000..6b2e2f0 --- /dev/null +++ b/pipelines/steps/skyvern.py @@ -0,0 +1,24 @@ +"""Skyvern step — browser automation via Skyvern API (Phase 2 placeholder).""" + +import logging + +logger = logging.getLogger(__name__) + + +async def execute_skyvern(config: dict, send_text=None, target_room: str = "", **_kwargs) -> str: + """Dispatch a browser task to Skyvern. + + Phase 2: Will integrate with self-hosted Skyvern on matrixhost. + """ + task = config.get("task", {}) + url = task.get("url", "") + goal = task.get("goal", "") + + if send_text and target_room: + await send_text( + target_room, + f"**Browser Task**: Skyvern integration pending setup.\n" + f"URL: {url}\nGoal: {goal}", + ) + + raise NotImplementedError("Skyvern step not yet implemented (Phase 2)") diff --git a/pipelines/steps/template.py b/pipelines/steps/template.py new file mode 100644 index 0000000..e7edd53 --- /dev/null +++ b/pipelines/steps/template.py @@ -0,0 +1,18 @@ +"""Template step — format and post a message to the target room.""" + +import logging + +logger = logging.getLogger(__name__) + + +async def execute_template(config: dict, send_text=None, target_room: str = "", **_kwargs) -> str: + """Render a template message and post it to the target room.""" + template = config.get("template", config.get("message", "")) + if not template: + raise ValueError("template step requires 'template' or 'message' field") + + # Template is already rendered by the engine before reaching here + if send_text and target_room: + await send_text(target_room, template) + + return template