Files
matrix-ai-agent/pipelines/steps/__init__.py
Christian Gick 0c0a424004
Some checks failed
Build & Deploy / test (push) Successful in 10s
Tests / test (push) Successful in 10s
Build & Deploy / build-and-deploy (push) Failing after 11m26s
fix(MAT-273): remove Skyvern (archived) + fix CI test failures
- Remove Skyvern service + DB from docker-compose.yml
- Remove cron/browser_executor.py and pipelines/steps/skyvern.py
- Remove browser_scrape from cron executor dispatch
- Update tests to reflect Skyvern removal
- Fix test_needs_query_rewrite false positive ('das' is a valid trigger)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 13:23:41 +03:00

46 lines
1.2 KiB
Python

"""Step type registry and dispatcher."""
import logging
from .script import execute_script
from .claude_prompt import execute_claude_prompt
from .template import execute_template
from .api_call import execute_api_call
from .pitrader_step import execute_pitrader
logger = logging.getLogger(__name__)
STEP_EXECUTORS = {
"script": execute_script,
"claude_prompt": execute_claude_prompt,
"template": execute_template,
"api_call": execute_api_call,
"pitrader_script": execute_pitrader,
}
async def execute_step(
step_type: str,
step_config: dict,
context: dict,
send_text,
target_room: str,
llm=None,
default_model: str = "claude-haiku",
escalation_model: str = "claude-sonnet",
) -> str:
"""Execute a pipeline step and return its output as a string."""
executor = STEP_EXECUTORS.get(step_type)
if not executor:
raise ValueError(f"Unknown step type: {step_type}")
return await executor(
config=step_config,
context=context,
send_text=send_text,
target_room=target_room,
llm=llm,
default_model=default_model,
escalation_model=escalation_model,
)