runner/runner.py
Markov 8b6ea6c462 Initial runner: agent container + host runner + IPC
- agent/: Claude Agent SDK inside Docker container
  - Persistent sessions (resume/checkpoint)
  - MCP tools for Tracker (chat, tasks)
  - File-based IPC protocol
- runner.py: Host-side container manager
  - Docker lifecycle management
  - IPC file processing → Tracker REST API
  - Interactive CLI for testing
- Dockerfile: node:22-slim + Claude Agent SDK
- Based on NanoClaw architecture, stripped to essentials
2026-02-16 22:31:30 +01:00

313 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Team Board Agent Runner (host-side)
Manages agent containers:
- Starts Docker container with Claude Agent SDK
- Sends prompts via stdin
- Reads results from stdout (marker-based protocol)
- Handles IPC files for chat/task operations
- Forwards IPC to Tracker API
Future: WebSocket connection to Tracker for real-time events.
Currently: CLI-based for testing.
"""
import asyncio
import json
import logging
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
import httpx
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger("runner")
# --- Config ---
TRACKER_URL = os.getenv("TRACKER_URL", "http://localhost:8100")
TRACKER_TOKEN = os.getenv("TRACKER_TOKEN", "tb-tracker-dev-token")
DATA_DIR = Path(os.getenv("RUNNER_DATA_DIR", "/opt/team-board-runner/data"))
DOCKER_IMAGE = os.getenv("RUNNER_IMAGE", "team-board-agent:latest")
IDLE_TIMEOUT = int(os.getenv("RUNNER_IDLE_TIMEOUT", "300")) # 5 min
OUTPUT_START = "---AGENT_OUTPUT_START---"
OUTPUT_END = "---AGENT_OUTPUT_END---"
class AgentContainer:
"""Manages a single agent container."""
def __init__(self, slug: str, name: str, secrets: dict):
self.slug = slug
self.name = name
self.secrets = secrets
self.session_id: str | None = None
self.process: subprocess.Popen | None = None
self.workspace = DATA_DIR / "agents" / slug
self.ipc_dir = self.workspace / "ipc"
def ensure_dirs(self):
"""Create workspace directories."""
for d in ["ipc/input", "ipc/messages", "ipc/tasks", "conversations"]:
(self.workspace / d).mkdir(parents=True, exist_ok=True)
def start(self, prompt: str) -> None:
"""Start container with initial prompt."""
self.ensure_dirs()
container_name = f"tb-agent-{self.slug}-{int(time.time())}"
cmd = [
"docker", "run", "-i", "--rm",
"--name", container_name,
"-v", f"{self.workspace}:/workspace",
DOCKER_IMAGE,
]
logger.info("Starting container %s for agent '%s'", container_name, self.name)
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Send initial input
input_data = json.dumps({
"prompt": prompt,
"sessionId": self.session_id,
"agentSlug": self.slug,
"agentName": self.name,
"secrets": self.secrets,
})
self.process.stdin.write(input_data)
self.process.stdin.close()
def send_message(self, text: str) -> None:
"""Send follow-up message via IPC file."""
ipc_input = self.ipc_dir / "input"
ipc_input.mkdir(parents=True, exist_ok=True)
filename = f"{int(time.time() * 1000)}-{os.urandom(4).hex()}.json"
tmp_path = ipc_input / f"{filename}.tmp"
final_path = ipc_input / filename
tmp_path.write_text(json.dumps({"type": "message", "text": text}))
tmp_path.rename(final_path)
logger.info("IPC message sent to %s (%d chars)", self.slug, len(text))
def close(self) -> None:
"""Send close signal."""
sentinel = self.ipc_dir / "input" / "_close"
sentinel.touch()
logger.info("Close signal sent to %s", self.slug)
def read_outputs(self) -> list[dict]:
"""Read all available outputs from stdout (non-blocking-ish)."""
if not self.process or not self.process.stdout:
return []
outputs = []
buffer = ""
while True:
line = self.process.stdout.readline()
if not line:
break
buffer += line
if OUTPUT_START in buffer:
start_idx = buffer.index(OUTPUT_START)
if OUTPUT_END in buffer[start_idx:]:
end_idx = buffer.index(OUTPUT_END, start_idx)
json_str = buffer[start_idx + len(OUTPUT_START):end_idx].strip()
try:
output = json.loads(json_str)
outputs.append(output)
if output.get("newSessionId"):
self.session_id = output["newSessionId"]
except json.JSONDecodeError as e:
logger.error("Failed to parse output: %s", e)
buffer = buffer[end_idx + len(OUTPUT_END):]
return outputs
def is_running(self) -> bool:
if not self.process:
return False
return self.process.poll() is None
class IpcProcessor:
"""Processes IPC files from agent containers and forwards to Tracker."""
def __init__(self, tracker_url: str, tracker_token: str):
self.tracker_url = tracker_url
self.tracker_token = tracker_token
async def process_agent_ipc(self, agent: AgentContainer):
"""Process all pending IPC files for an agent."""
await self._process_messages(agent)
await self._process_tasks(agent)
async def _process_messages(self, agent: AgentContainer):
msg_dir = agent.ipc_dir / "messages"
if not msg_dir.exists():
return
for f in sorted(msg_dir.glob("*.json")):
try:
data = json.loads(f.read_text())
f.unlink()
if data.get("type") == "chat_message":
await self._send_chat_message(data)
except Exception as e:
logger.error("IPC message error: %s", e)
f.unlink(missing_ok=True)
async def _process_tasks(self, agent: AgentContainer):
task_dir = agent.ipc_dir / "tasks"
if not task_dir.exists():
return
for f in sorted(task_dir.glob("*.json")):
try:
data = json.loads(f.read_text())
f.unlink()
ipc_type = data.get("type")
if ipc_type == "task_status":
await self._update_task_status(data)
elif ipc_type == "task_comment":
await self._add_task_comment(data)
elif ipc_type == "task_take":
await self._take_task(data)
elif ipc_type == "task_complete":
await self._complete_task(data)
except Exception as e:
logger.error("IPC task error: %s", e)
f.unlink(missing_ok=True)
async def _send_chat_message(self, data: dict):
"""Forward chat message to Tracker REST API."""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.tracker_url}/api/v1/chats/{data['chat_id']}/messages",
headers={"Authorization": f"Bearer {self.tracker_token}"},
json={
"sender_type": data.get("sender_type", "agent"),
"sender_name": data.get("sender_name", "Agent"),
"content": data["content"],
},
)
if resp.status_code < 300:
logger.info("Chat message forwarded to %s", data["chat_id"])
else:
logger.error("Chat forward failed: %s %s", resp.status_code, resp.text)
async def _update_task_status(self, data: dict):
async with httpx.AsyncClient() as client:
await client.patch(
f"{self.tracker_url}/api/v1/tasks/{data['task_id']}",
headers={"Authorization": f"Bearer {self.tracker_token}"},
json={"status": data["status"]},
)
logger.info("Task %s%s", data["task_id"], data["status"])
async def _add_task_comment(self, data: dict):
logger.info("Task comment: %s (TODO: implement endpoint)", data["task_id"])
async def _take_task(self, data: dict):
async with httpx.AsyncClient() as client:
await client.patch(
f"{self.tracker_url}/api/v1/tasks/{data['task_id']}",
headers={"Authorization": f"Bearer {self.tracker_token}"},
json={"status": "in_progress", "assigned_to": data["agent_slug"]},
)
logger.info("Task %s taken by %s", data["task_id"], data["agent_slug"])
async def _complete_task(self, data: dict):
async with httpx.AsyncClient() as client:
await client.patch(
f"{self.tracker_url}/api/v1/tasks/{data['task_id']}",
headers={"Authorization": f"Bearer {self.tracker_token}"},
json={"status": "done"},
)
logger.info("Task %s completed", data["task_id"])
# --- CLI Interface (for testing) ---
async def interactive_session(agent_slug: str, agent_name: str):
"""Run an interactive chat session with an agent."""
secrets = {}
oauth_token = os.getenv("CLAUDE_CODE_OAUTH_TOKEN", "")
api_key = os.getenv("ANTHROPIC_API_KEY", "")
if oauth_token:
secrets["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token
if api_key:
secrets["ANTHROPIC_API_KEY"] = api_key
if not secrets:
logger.error("Set CLAUDE_CODE_OAUTH_TOKEN or ANTHROPIC_API_KEY")
sys.exit(1)
agent = AgentContainer(agent_slug, agent_name, secrets)
ipc = IpcProcessor(TRACKER_URL, TRACKER_TOKEN)
print(f"\n🤖 Agent '{agent_name}' ({agent_slug})")
print("Type your messages. Ctrl+C to exit.\n")
prompt = input("You: ").strip()
if not prompt:
return
agent.start(prompt)
# Read loop
try:
while agent.is_running():
outputs = agent.read_outputs()
for out in outputs:
if out.get("result"):
print(f"\n🤖 {agent_name}: {out['result']}\n")
# Process IPC
await ipc.process_agent_ipc(agent)
if not agent.is_running():
break
await asyncio.sleep(0.5)
except KeyboardInterrupt:
agent.close()
print("\nSession ended.")
def main():
import argparse
parser = argparse.ArgumentParser(description="Team Board Agent Runner")
parser.add_argument("--slug", default="coder", help="Agent slug")
parser.add_argument("--name", default="Кодер", help="Agent name")
args = parser.parse_args()
asyncio.run(interactive_session(args.slug, args.name))
if __name__ == "__main__":
main()