Add pitrader_script executor for running PITrader scripts (pi-scan, playbook, execute_trades) as pipeline steps with vault credential injection and JSON output capture. Extend claude_prompt step with vision support (image_b64 in trigger context). Add image pipeline trigger to on_image_message handler. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
48 lines
1.2 KiB
Python
48 lines
1.2 KiB
Python
"""Step type registry and dispatcher."""
|
|
|
|
import logging
|
|
|
|
from .script import execute_script
|
|
from .claude_prompt import execute_claude_prompt
|
|
from .template import execute_template
|
|
from .api_call import execute_api_call
|
|
from .skyvern import execute_skyvern
|
|
from .pitrader_step import execute_pitrader
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
STEP_EXECUTORS = {
|
|
"script": execute_script,
|
|
"claude_prompt": execute_claude_prompt,
|
|
"template": execute_template,
|
|
"api_call": execute_api_call,
|
|
"skyvern": execute_skyvern,
|
|
"pitrader_script": execute_pitrader,
|
|
}
|
|
|
|
|
|
async def execute_step(
|
|
step_type: str,
|
|
step_config: dict,
|
|
context: dict,
|
|
send_text,
|
|
target_room: str,
|
|
llm=None,
|
|
default_model: str = "claude-haiku",
|
|
escalation_model: str = "claude-sonnet",
|
|
) -> str:
|
|
"""Execute a pipeline step and return its output as a string."""
|
|
executor = STEP_EXECUTORS.get(step_type)
|
|
if not executor:
|
|
raise ValueError(f"Unknown step type: {step_type}")
|
|
|
|
return await executor(
|
|
config=step_config,
|
|
context=context,
|
|
send_text=send_text,
|
|
target_room=target_room,
|
|
llm=llm,
|
|
default_model=default_model,
|
|
escalation_model=escalation_model,
|
|
)
|