- Implemented ReAct loop in agent.py - Added Anthropic provider using httpx (no SDK) - Created tools: read, write, edit, bash - Added session management with JSONL format - Included configuration system with env var support - Added CLI interface and example usage - Minimal dependencies: only httpx
152 lines
5.3 KiB
Python
152 lines
5.3 KiB
Python
"""
|
|
Main agent loop with ReAct pattern
|
|
"""
|
|
|
|
import uuid
|
|
import asyncio
|
|
from typing import Dict, Any, Optional, List
|
|
from .config import Config
|
|
from .session import Session
|
|
from .context import Context
|
|
from .providers.anthropic import AnthropicProvider
|
|
from .tools.registry import registry
|
|
from .tools.read import ReadTool
|
|
from .tools.write import WriteTool
|
|
from .tools.edit import EditTool
|
|
from .tools.bash import BashTool
|
|
|
|
|
|
class Agent:
|
|
"""Main PicoGent agent with ReAct loop"""
|
|
|
|
def __init__(self, config: Config, context: Optional[Context] = None):
|
|
self.config = config
|
|
self.context = context or Context(config.workspace)
|
|
self.provider = None
|
|
|
|
# Initialize tools
|
|
self._initialize_tools()
|
|
|
|
def _initialize_tools(self):
|
|
"""Initialize and register all tools"""
|
|
# Register tools with workspace context
|
|
registry.register(ReadTool())
|
|
registry.register(WriteTool())
|
|
registry.register(EditTool())
|
|
registry.register(BashTool(self.context.get_workspace()))
|
|
|
|
async def __aenter__(self):
|
|
"""Async context manager entry"""
|
|
self.provider = AnthropicProvider(self.config)
|
|
await self.provider.__aenter__()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""Async context manager exit"""
|
|
if self.provider:
|
|
await self.provider.__aexit__(exc_type, exc_val, exc_tb)
|
|
|
|
async def run_agent(self, prompt: str, session: Optional[Session] = None) -> str:
|
|
"""
|
|
Main ReAct loop
|
|
|
|
Args:
|
|
prompt: User prompt/question
|
|
session: Optional session for conversation history
|
|
|
|
Returns:
|
|
Final response from the agent
|
|
"""
|
|
# Create session if not provided
|
|
if session is None:
|
|
session = Session()
|
|
|
|
# Add user message to session
|
|
session.add_message("user", prompt)
|
|
|
|
# ReAct loop
|
|
for iteration in range(self.config.max_iterations):
|
|
try:
|
|
# Get messages and system prompt
|
|
messages = session.get_anthropic_messages()
|
|
system_prompt = self.context.get_system_prompt()
|
|
tools = registry.get_tool_definitions()
|
|
|
|
# Call LLM
|
|
response = await self.provider.chat_completion(
|
|
messages=messages,
|
|
tools=tools,
|
|
system_prompt=system_prompt
|
|
)
|
|
|
|
# Handle errors
|
|
if "error" in response:
|
|
error_msg = f"Provider error: {response['error']}"
|
|
session.add_message("assistant", error_msg)
|
|
return error_msg
|
|
|
|
# Get content and tool calls
|
|
content = response.get("content", "").strip()
|
|
tool_calls = self.provider.parse_tool_calls(response)
|
|
|
|
# No tool calls - return text response
|
|
if not tool_calls:
|
|
if content:
|
|
session.add_message("assistant", content)
|
|
return content
|
|
else:
|
|
error_msg = "Empty response from provider"
|
|
session.add_message("assistant", error_msg)
|
|
return error_msg
|
|
|
|
# Add assistant message with tool calls
|
|
if content:
|
|
# Add text content if present
|
|
session.add_message("assistant", content)
|
|
|
|
# Execute each tool call
|
|
all_results = []
|
|
for tool_call in tool_calls:
|
|
tool_id = tool_call.get("id") or str(uuid.uuid4())
|
|
tool_name = tool_call.get("name", "")
|
|
tool_args = tool_call.get("arguments", {})
|
|
|
|
# Add tool use to session
|
|
session.add_tool_use(tool_name, tool_args, tool_id)
|
|
|
|
# Execute tool
|
|
result = await registry.execute_tool(tool_name, tool_args)
|
|
all_results.append(result)
|
|
|
|
# Add tool result to session
|
|
session.add_tool_result(tool_id, result)
|
|
|
|
# Continue loop to process tool results
|
|
continue
|
|
|
|
except Exception as e:
|
|
error_msg = f"Agent error in iteration {iteration + 1}: {str(e)}"
|
|
session.add_message("assistant", error_msg)
|
|
return error_msg
|
|
|
|
# Max iterations reached
|
|
final_msg = f"Reached maximum iterations ({self.config.max_iterations}). Task may be incomplete."
|
|
session.add_message("assistant", final_msg)
|
|
return final_msg
|
|
|
|
|
|
# Convenience function
|
|
async def run_agent(prompt: str, config: Config, session: Optional[Session] = None) -> str:
|
|
"""
|
|
Convenience function to run agent
|
|
|
|
Args:
|
|
prompt: User prompt
|
|
config: Agent configuration
|
|
session: Optional session for conversation history
|
|
|
|
Returns:
|
|
Agent response
|
|
"""
|
|
async with Agent(config) as agent:
|
|
return await agent.run_agent(prompt, session) |