picogent-py/picogent/tools/bash.py
Markov 5417980b76 Initial implementation of PicoGent - minimal AI coding agent
- Implemented ReAct loop in agent.py
- Added Anthropic provider using httpx (no SDK)
- Created tools: read, write, edit, bash
- Added session management with JSONL format
- Included configuration system with env var support
- Added CLI interface and example usage
- Minimal dependencies: only httpx
2026-02-22 23:18:02 +01:00

106 lines
3.5 KiB
Python

"""
Bash execution tool
"""
import subprocess
import asyncio
import os
from typing import Dict, Any
from .registry import Tool
class BashTool(Tool):
"""Tool for executing shell commands"""
def __init__(self):
super().__init__(
name="bash",
description="Execute shell commands with timeout. Use for running system commands, scripts, and terminal operations.",
parameters={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute"
},
"timeout": {
"type": "number",
"description": "Timeout in seconds (default: 30)",
"default": 30
},
"cwd": {
"type": "string",
"description": "Working directory to execute command in (optional)"
}
},
"required": ["command"]
}
)
async def execute(self, args: Dict[str, Any]) -> str:
"""Execute the bash tool"""
command = args.get("command")
timeout = args.get("timeout", 30)
cwd = args.get("cwd")
if not command:
return "Error: command is required"
# Validate timeout
if timeout <= 0:
timeout = 30
# Validate and resolve cwd
if cwd:
if not os.path.isabs(cwd):
cwd = os.path.abspath(cwd)
if not os.path.exists(cwd):
return f"Error: Working directory '{cwd}' does not exist"
if not os.path.isdir(cwd):
return f"Error: Working directory '{cwd}' is not a directory"
try:
# Execute command asynchronously
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=os.environ.copy()
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
# Kill the process if it times out
process.kill()
await process.wait()
return f"Error: Command timed out after {timeout} seconds"
# Decode output
stdout_text = stdout.decode('utf-8', errors='replace').strip()
stderr_text = stderr.decode('utf-8', errors='replace').strip()
# Format result
result_parts = [f"Command: {command}"]
if cwd:
result_parts.append(f"Working directory: {cwd}")
result_parts.append(f"Exit code: {process.returncode}")
if stdout_text:
result_parts.append(f"STDOUT:\n{stdout_text}")
if stderr_text:
result_parts.append(f"STDERR:\n{stderr_text}")
if not stdout_text and not stderr_text:
result_parts.append("(No output)")
return "\n\n".join(result_parts)
except Exception as e:
return f"Error: Could not execute command '{command}': {e}"