diff --git a/bot.py b/bot.py index 0cf2b2b..e7cb95b 100644 --- a/bot.py +++ b/bot.py @@ -2123,6 +2123,15 @@ class Bot: if time.time() - server_ts > 30: return + # Check for pipeline file_upload triggers (before DM/mention check) + source = event.source or {} + content = source.get("content", {}) + info = content.get("info", {}) + img_mime = info.get("mimetype", "image/png") + img_filename = content.get("body", "image.png") + if self.cron_scheduler: + await self._check_pipeline_file_trigger(room, event, img_filename, img_mime) + await self._load_room_settings(room.room_id) # In DMs respond to all images; in groups only if bot was recently @mentioned @@ -2280,13 +2289,17 @@ class Bot: # Download file content for trigger data mxc_url = event.url if hasattr(event, "url") else None file_text = "" + image_b64 = "" if mxc_url: try: resp = await self.client.download(mxc=mxc_url) if hasattr(resp, "body"): file_bytes = resp.body ext = os.path.splitext(filename.lower())[1] - if ext == ".pdf": + if mime_type.startswith("image/"): + # Encode image as base64 for vision analysis in claude_prompt steps + image_b64 = base64.b64encode(file_bytes).decode("utf-8") + elif ext == ".pdf": file_text = self._extract_pdf_text(file_bytes) elif ext == ".docx": file_text = self._extract_docx_text(file_bytes) @@ -2301,6 +2314,7 @@ class Bot: "filename": filename, "mime_type": mime_type, "file_content": file_text, + "image_b64": image_b64, "sender": event.sender, "room_id": room.room_id, } diff --git a/pipelines/steps/__init__.py b/pipelines/steps/__init__.py index 609b7c3..2c134b6 100644 --- a/pipelines/steps/__init__.py +++ b/pipelines/steps/__init__.py @@ -7,6 +7,7 @@ from .claude_prompt import execute_claude_prompt from .template import execute_template from .api_call import execute_api_call from .skyvern import execute_skyvern +from .pitrader_step import execute_pitrader logger = logging.getLogger(__name__) @@ -16,6 +17,7 @@ STEP_EXECUTORS = { "template": execute_template, "api_call": execute_api_call, "skyvern": execute_skyvern, + "pitrader_script": execute_pitrader, } diff --git a/pipelines/steps/claude_prompt.py b/pipelines/steps/claude_prompt.py index 89b1cf2..d3be397 100644 --- a/pipelines/steps/claude_prompt.py +++ b/pipelines/steps/claude_prompt.py @@ -7,12 +7,17 @@ logger = logging.getLogger(__name__) async def execute_claude_prompt( config: dict, + context: dict | None = None, llm=None, default_model: str = "claude-haiku", escalation_model: str = "claude-sonnet", **_kwargs, ) -> str: - """Send a prompt to Claude and return the response.""" + """Send a prompt to Claude and return the response. + + Supports vision: if config contains 'image_b64' or trigger context has + 'image_b64', the image is included as a vision content block. + """ if not llm: raise RuntimeError("LLM client not configured") @@ -23,9 +28,34 @@ async def execute_claude_prompt( model_name = config.get("model", "default") model = escalation_model if model_name == "escalation" else default_model + # Check for image data (from config or trigger context) + image_b64 = config.get("image_b64", "") + image_mime = config.get("image_mime", "image/png") + if not image_b64 and context: + trigger = context.get("trigger", {}) + image_b64 = trigger.get("image_b64", "") + image_mime = trigger.get("mime_type", "image/png") + + # Build message content + if image_b64: + content = [ + { + "type": "image_url", + "image_url": { + "url": f"data:{image_mime};base64,{image_b64}", + }, + }, + { + "type": "text", + "text": prompt, + }, + ] + else: + content = prompt + response = await llm.chat.completions.create( model=model, - messages=[{"role": "user", "content": prompt}], + messages=[{"role": "user", "content": content}], max_tokens=4096, ) diff --git a/pipelines/steps/pitrader_step.py b/pipelines/steps/pitrader_step.py new file mode 100644 index 0000000..17601fa --- /dev/null +++ b/pipelines/steps/pitrader_step.py @@ -0,0 +1,110 @@ +"""PITrader step — execute PITrader scripts with JSON output capture.""" + +import asyncio +import json +import logging +import os + +logger = logging.getLogger(__name__) + +PITRADER_DIR = os.environ.get("PITRADER_DIR", os.path.expanduser("~/Development/Apps/PITrader")) + + +async def execute_pitrader(config: dict, **_kwargs) -> str: + """Execute a PITrader script and return JSON output. + + Config fields: + script: Script path relative to PITrader dir (e.g., "scripts/pi-scan") + or absolute path + args: List of CLI arguments (default: []) + timeout_s: Override timeout in seconds (default: 300) + json_output: If true, append --json flag (default: true) + env: Extra environment variables dict (default: {}) + """ + script = config.get("script", "") + if not script: + raise ValueError("pitrader_script step requires 'script' field") + + args = config.get("args", []) + if isinstance(args, str): + args = args.split() + json_output = config.get("json_output", True) + extra_env = config.get("env", {}) + timeout_s = config.get("timeout_s", 300) + + # Build command + if not os.path.isabs(script): + script = os.path.join(PITRADER_DIR, script) + + cmd_parts = [script] + args + if json_output and "--json" not in args: + cmd_parts.append("--json") + + cmd = " ".join(cmd_parts) + logger.info("PITrader step: %s (cwd=%s, timeout=%ds)", cmd, PITRADER_DIR, timeout_s) + + # Build environment: inherit current + PYTHONPATH for imports + extras + env = os.environ.copy() + env["PYTHONPATH"] = PITRADER_DIR + os.pathsep + env.get("PYTHONPATH", "") + env.update(extra_env) + + # Fetch vault credentials if not already in env + if "ETORO_API_KEY" not in env: + try: + vault_proc = await asyncio.create_subprocess_exec( + "vault", "get", "etoro.api_key", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, _ = await asyncio.wait_for(vault_proc.communicate(), timeout=10) + if vault_proc.returncode == 0: + env["ETORO_API_KEY"] = stdout.decode().strip() + except Exception: + logger.debug("Could not fetch etoro.api_key from vault") + + if "ETORO_USER_KEY" not in env: + try: + vault_proc = await asyncio.create_subprocess_exec( + "vault", "get", "etoro.user_key", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, _ = await asyncio.wait_for(vault_proc.communicate(), timeout=10) + if vault_proc.returncode == 0: + env["ETORO_USER_KEY"] = stdout.decode().strip() + except Exception: + logger.debug("Could not fetch etoro.user_key from vault") + + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=PITRADER_DIR, + env=env, + ) + + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout_s) + except asyncio.TimeoutError: + proc.kill() + raise RuntimeError(f"PITrader script timed out after {timeout_s}s: {cmd}") + + output = stdout.decode("utf-8", errors="replace").strip() + err_output = stderr.decode("utf-8", errors="replace").strip() + + if proc.returncode != 0: + raise RuntimeError( + f"PITrader script exited with code {proc.returncode}: {err_output or output}" + ) + + # Validate JSON output if expected + if json_output and output: + try: + json.loads(output) + except json.JSONDecodeError: + logger.warning("PITrader output is not valid JSON, returning raw output") + + if err_output: + logger.debug("PITrader stderr: %s", err_output[:500]) + + return output