Sequential step executor (script, claude_prompt, approval, api_call, template, skyvern placeholder), reaction-based approvals, file upload trigger matching, portal API state sync. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
28 lines
831 B
Python
28 lines
831 B
Python
"""Script step — execute a shell command and capture output."""
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def execute_script(config: dict, **_kwargs) -> str:
|
|
"""Execute a shell script and return stdout."""
|
|
script = config.get("script", "")
|
|
if not script:
|
|
raise ValueError("Script step requires 'script' field")
|
|
|
|
proc = await asyncio.create_subprocess_shell(
|
|
script,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
output = stdout.decode("utf-8", errors="replace").strip()
|
|
if proc.returncode != 0:
|
|
err = stderr.decode("utf-8", errors="replace").strip()
|
|
raise RuntimeError(f"Script exited with code {proc.returncode}: {err or output}")
|
|
|
|
return output
|