Add pitrader_script executor for running PITrader scripts (pi-scan, playbook, execute_trades) as pipeline steps with vault credential injection and JSON output capture. Extend claude_prompt step with vision support (image_b64 in trigger context). Add image pipeline trigger to on_image_message handler. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
111 lines
3.8 KiB
Python
111 lines
3.8 KiB
Python
"""PITrader step — execute PITrader scripts with JSON output capture."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PITRADER_DIR = os.environ.get("PITRADER_DIR", os.path.expanduser("~/Development/Apps/PITrader"))
|
|
|
|
|
|
async def execute_pitrader(config: dict, **_kwargs) -> str:
|
|
"""Execute a PITrader script and return JSON output.
|
|
|
|
Config fields:
|
|
script: Script path relative to PITrader dir (e.g., "scripts/pi-scan")
|
|
or absolute path
|
|
args: List of CLI arguments (default: [])
|
|
timeout_s: Override timeout in seconds (default: 300)
|
|
json_output: If true, append --json flag (default: true)
|
|
env: Extra environment variables dict (default: {})
|
|
"""
|
|
script = config.get("script", "")
|
|
if not script:
|
|
raise ValueError("pitrader_script step requires 'script' field")
|
|
|
|
args = config.get("args", [])
|
|
if isinstance(args, str):
|
|
args = args.split()
|
|
json_output = config.get("json_output", True)
|
|
extra_env = config.get("env", {})
|
|
timeout_s = config.get("timeout_s", 300)
|
|
|
|
# Build command
|
|
if not os.path.isabs(script):
|
|
script = os.path.join(PITRADER_DIR, script)
|
|
|
|
cmd_parts = [script] + args
|
|
if json_output and "--json" not in args:
|
|
cmd_parts.append("--json")
|
|
|
|
cmd = " ".join(cmd_parts)
|
|
logger.info("PITrader step: %s (cwd=%s, timeout=%ds)", cmd, PITRADER_DIR, timeout_s)
|
|
|
|
# Build environment: inherit current + PYTHONPATH for imports + extras
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = PITRADER_DIR + os.pathsep + env.get("PYTHONPATH", "")
|
|
env.update(extra_env)
|
|
|
|
# Fetch vault credentials if not already in env
|
|
if "ETORO_API_KEY" not in env:
|
|
try:
|
|
vault_proc = await asyncio.create_subprocess_exec(
|
|
"vault", "get", "etoro.api_key",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, _ = await asyncio.wait_for(vault_proc.communicate(), timeout=10)
|
|
if vault_proc.returncode == 0:
|
|
env["ETORO_API_KEY"] = stdout.decode().strip()
|
|
except Exception:
|
|
logger.debug("Could not fetch etoro.api_key from vault")
|
|
|
|
if "ETORO_USER_KEY" not in env:
|
|
try:
|
|
vault_proc = await asyncio.create_subprocess_exec(
|
|
"vault", "get", "etoro.user_key",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, _ = await asyncio.wait_for(vault_proc.communicate(), timeout=10)
|
|
if vault_proc.returncode == 0:
|
|
env["ETORO_USER_KEY"] = stdout.decode().strip()
|
|
except Exception:
|
|
logger.debug("Could not fetch etoro.user_key from vault")
|
|
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=PITRADER_DIR,
|
|
env=env,
|
|
)
|
|
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout_s)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
raise RuntimeError(f"PITrader script timed out after {timeout_s}s: {cmd}")
|
|
|
|
output = stdout.decode("utf-8", errors="replace").strip()
|
|
err_output = stderr.decode("utf-8", errors="replace").strip()
|
|
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(
|
|
f"PITrader script exited with code {proc.returncode}: {err_output or output}"
|
|
)
|
|
|
|
# Validate JSON output if expected
|
|
if json_output and output:
|
|
try:
|
|
json.loads(output)
|
|
except json.JSONDecodeError:
|
|
logger.warning("PITrader output is not valid JSON, returning raw output")
|
|
|
|
if err_output:
|
|
logger.debug("PITrader stderr: %s", err_output[:500])
|
|
|
|
return output
|