Files
matrix-ai-agent/pipelines/steps/__init__.py
Christian Gick bd8d96335e feat: add pipeline engine with approval flow and file triggers
Sequential step executor (script, claude_prompt, approval, api_call,
template, skyvern placeholder), reaction-based approvals, file upload
trigger matching, portal API state sync.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 17:06:07 +02:00

46 lines
1.1 KiB
Python

"""Step type registry and dispatcher."""
import logging
from .script import execute_script
from .claude_prompt import execute_claude_prompt
from .template import execute_template
from .api_call import execute_api_call
from .skyvern import execute_skyvern
logger = logging.getLogger(__name__)
STEP_EXECUTORS = {
"script": execute_script,
"claude_prompt": execute_claude_prompt,
"template": execute_template,
"api_call": execute_api_call,
"skyvern": execute_skyvern,
}
async def execute_step(
step_type: str,
step_config: dict,
context: dict,
send_text,
target_room: str,
llm=None,
default_model: str = "claude-haiku",
escalation_model: str = "claude-sonnet",
) -> str:
"""Execute a pipeline step and return its output as a string."""
executor = STEP_EXECUTORS.get(step_type)
if not executor:
raise ValueError(f"Unknown step type: {step_type}")
return await executor(
config=step_config,
context=context,
send_text=send_text,
target_room=target_room,
llm=llm,
default_model=default_model,
escalation_model=escalation_model,
)