diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5f99054 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +node_modules/ +dist/ +venv/ +__pycache__/ +*.pyc +.env +data/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..cfee99e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +FROM node:22-slim + +# System deps for Claude Code tools +RUN apt-get update && apt-get install -y \ + git curl jq ripgrep \ + python3 python3-pip \ + && rm -rf /var/lib/apt/lists/* + +# Working directory +WORKDIR /app + +# Install agent dependencies +COPY agent/package.json agent/package-lock.json* ./ +RUN npm install + +# Copy and build agent code +COPY agent/tsconfig.json ./ +COPY agent/src/ ./src/ +RUN npx tsc + +# Workspace for agent data (mounted from host) +RUN mkdir -p /workspace /workspace/ipc/input /workspace/ipc/messages /workspace/ipc/tasks /workspace/conversations + +WORKDIR /workspace + +ENTRYPOINT ["node", "/app/dist/index.js"] diff --git a/README.md b/README.md index abfe6c7..d5d5df0 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,56 @@ -# runner +# Team Board Runner -Универсальный runner для AI-агентов Team Board. Контейнеризованный, multi-model. \ No newline at end of file +Универсальный runner для AI-агентов Team Board. Запускает агентов в Docker-контейнерах с изоляцией. + +## Архитектура + +``` +Host (runner.py) Docker Container (agent) +┌──────────────┐ stdin/stdout ┌──────────────────────┐ +│ Runner │◄──────────────────►│ Claude Agent SDK │ +│ │ │ + MCP Tools │ +│ - Container │ IPC files │ + Persistent │ +│ manager │◄──────────────────►│ Sessions │ +│ - IPC → │ │ + SQLite (internal) │ +│ Tracker │ │ │ +└──────────────┘ └──────────────────────┘ +``` + +## Компоненты + +### agent/ — код внутри контейнера +- `index.ts` — Claude Agent SDK query loop с persistent sessions +- `mcp.ts` — MCP tools для работы с Tracker (chat, tasks) + +### runner.py — код на хосте +- Запускает Docker контейнеры +- Маршрутизирует IPC файлы → Tracker REST API +- Будущее: WebSocket к Tracker для real-time событий + +## Использование + +```bash +# Собрать Docker образ +docker build -t team-board-agent . + +# Запустить интерактивную сессию +CLAUDE_CODE_OAUTH_TOKEN=xxx python3 runner.py --slug coder --name "Кодер" +``` + +## IPC Protocol + +Агент внутри контейнера общается с runner'ом через файлы: + +- `/workspace/ipc/input/*.json` — сообщения от runner'а к агенту +- `/workspace/ipc/messages/*.json` — чат-сообщения от агента +- `/workspace/ipc/tasks/*.json` — операции с задачами от агента +- `/workspace/ipc/input/_close` — сигнал завершения + +## Multi-model (будущее) + +Архитектура позволяет подключить другие модели: +- Gemini (через MCP или прямой API) +- ChatGPT (через API) +- Локальные модели (ollama) + +Для этого нужно заменить Claude Agent SDK на соответствующий SDK внутри контейнера. diff --git a/agent/package.json b/agent/package.json new file mode 100644 index 0000000..e4cda7d --- /dev/null +++ b/agent/package.json @@ -0,0 +1,20 @@ +{ + "name": "team-board-agent", + "version": "0.1.0", + "type": "module", + "description": "Team Board Agent — runs inside container with Claude Agent SDK", + "main": "dist/index.js", + "scripts": { + "build": "tsc", + "start": "node dist/index.js" + }, + "dependencies": { + "@anthropic-ai/claude-agent-sdk": "^0.2.34", + "@modelcontextprotocol/sdk": "^1.12.1", + "zod": "^4.0.0" + }, + "devDependencies": { + "@types/node": "^22.10.7", + "typescript": "^5.7.3" + } +} diff --git a/agent/src/index.ts b/agent/src/index.ts new file mode 100644 index 0000000..5aca824 --- /dev/null +++ b/agent/src/index.ts @@ -0,0 +1,364 @@ +/** + * Team Board Agent Runner + * Runs inside a container. Receives prompts via stdin, outputs results to stdout. + * Supports persistent sessions via Claude Agent SDK. + * + * Input: JSON on stdin (ContainerInput) + * Follow-up messages: JSON files in /workspace/ipc/input/ + * Output: JSON wrapped in markers on stdout + * Close signal: /workspace/ipc/input/_close + */ + +import fs from 'fs'; +import path from 'path'; +import { query, HookCallback, PreCompactHookInput, PreToolUseHookInput } from '@anthropic-ai/claude-agent-sdk'; + +// --- Types --- + +interface ContainerInput { + prompt: string; + sessionId?: string; + agentSlug: string; + agentName: string; + secrets?: Record; +} + +interface ContainerOutput { + status: 'success' | 'error'; + result: string | null; + newSessionId?: string; + error?: string; +} + +interface SDKUserMessage { + type: 'user'; + message: { role: 'user'; content: string }; + parent_tool_use_id: null; + session_id: string; +} + +// --- Constants --- + +const IPC_INPUT_DIR = '/workspace/ipc/input'; +const IPC_CLOSE_SENTINEL = path.join(IPC_INPUT_DIR, '_close'); +const IPC_POLL_MS = 500; +const OUTPUT_START = '---AGENT_OUTPUT_START---'; +const OUTPUT_END = '---AGENT_OUTPUT_END---'; + +// Secrets to strip from Bash subprocesses +const SECRET_ENV_VARS = ['ANTHROPIC_API_KEY', 'CLAUDE_CODE_OAUTH_TOKEN']; + +// --- MessageStream: push-based async iterable for streaming prompts --- + +class MessageStream { + private queue: SDKUserMessage[] = []; + private waiting: (() => void) | null = null; + private done = false; + + push(text: string): void { + this.queue.push({ + type: 'user', + message: { role: 'user', content: text }, + parent_tool_use_id: null, + session_id: '', + }); + this.waiting?.(); + } + + end(): void { + this.done = true; + this.waiting?.(); + } + + async *[Symbol.asyncIterator](): AsyncGenerator { + while (true) { + while (this.queue.length > 0) { + yield this.queue.shift()!; + } + if (this.done) return; + await new Promise(r => { this.waiting = r; }); + this.waiting = null; + } + } +} + +// --- Helpers --- + +function log(msg: string): void { + console.error(`[agent] ${msg}`); +} + +function writeOutput(output: ContainerOutput): void { + console.log(OUTPUT_START); + console.log(JSON.stringify(output)); + console.log(OUTPUT_END); +} + +async function readStdin(): Promise { + return new Promise((resolve, reject) => { + let data = ''; + process.stdin.setEncoding('utf8'); + process.stdin.on('data', chunk => { data += chunk; }); + process.stdin.on('end', () => resolve(data)); + process.stdin.on('error', reject); + }); +} + +function shouldClose(): boolean { + if (fs.existsSync(IPC_CLOSE_SENTINEL)) { + try { fs.unlinkSync(IPC_CLOSE_SENTINEL); } catch {} + return true; + } + return false; +} + +function drainIpcInput(): string[] { + try { + fs.mkdirSync(IPC_INPUT_DIR, { recursive: true }); + const files = fs.readdirSync(IPC_INPUT_DIR) + .filter(f => f.endsWith('.json')) + .sort(); + + const messages: string[] = []; + for (const file of files) { + const filePath = path.join(IPC_INPUT_DIR, file); + try { + const data = JSON.parse(fs.readFileSync(filePath, 'utf-8')); + fs.unlinkSync(filePath); + if (data.type === 'message' && data.text) { + messages.push(data.text); + } + } catch (err) { + log(`Failed to process IPC file ${file}: ${err}`); + try { fs.unlinkSync(filePath); } catch {} + } + } + return messages; + } catch { + return []; + } +} + +function waitForIpcMessage(): Promise { + return new Promise((resolve) => { + const poll = () => { + if (shouldClose()) { resolve(null); return; } + const msgs = drainIpcInput(); + if (msgs.length > 0) { resolve(msgs.join('\n')); return; } + setTimeout(poll, IPC_POLL_MS); + }; + poll(); + }); +} + +// --- Hooks --- + +/** Strip API keys from Bash subprocess environments */ +function createSanitizeBashHook(): HookCallback { + return async (input) => { + const preInput = input as PreToolUseHookInput; + const command = (preInput.tool_input as { command?: string })?.command; + if (!command) return {}; + const unsetPrefix = `unset ${SECRET_ENV_VARS.join(' ')} 2>/dev/null; `; + return { + hookSpecificOutput: { + hookEventName: 'PreToolUse', + updatedInput: { + ...(preInput.tool_input as Record), + command: unsetPrefix + command, + }, + }, + }; + }; +} + +/** Archive transcript before compaction */ +function createPreCompactHook(): HookCallback { + return async (input) => { + const preCompact = input as PreCompactHookInput; + const transcriptPath = preCompact.transcript_path; + if (!transcriptPath || !fs.existsSync(transcriptPath)) return {}; + + try { + const content = fs.readFileSync(transcriptPath, 'utf-8'); + const archiveDir = '/workspace/conversations'; + fs.mkdirSync(archiveDir, { recursive: true }); + const date = new Date().toISOString().split('T')[0]; + const time = new Date().toISOString().replace(/[:.]/g, '-'); + fs.writeFileSync( + path.join(archiveDir, `${date}-${time}.jsonl`), + content + ); + log(`Archived transcript before compaction`); + } catch (err) { + log(`Failed to archive: ${err}`); + } + return {}; + }; +} + +// --- Main query loop --- + +async function runQuery( + prompt: string, + sessionId: string | undefined, + input: ContainerInput, + sdkEnv: Record, + mcpServerPath: string, + resumeAt?: string, +): Promise<{ newSessionId?: string; lastAssistantUuid?: string; closedDuringQuery: boolean }> { + const stream = new MessageStream(); + stream.push(prompt); + + // Poll IPC during query + let ipcPolling = true; + let closedDuringQuery = false; + const pollIpc = () => { + if (!ipcPolling) return; + if (shouldClose()) { + closedDuringQuery = true; + stream.end(); + ipcPolling = false; + return; + } + const msgs = drainIpcInput(); + for (const text of msgs) { + log(`IPC message piped (${text.length} chars)`); + stream.push(text); + } + setTimeout(pollIpc, IPC_POLL_MS); + }; + setTimeout(pollIpc, IPC_POLL_MS); + + let newSessionId: string | undefined; + let lastAssistantUuid: string | undefined; + + // Load system prompt from CLAUDE.md if exists + const claudeMdPath = '/workspace/CLAUDE.md'; + let systemPromptAppend: string | undefined; + if (fs.existsSync(claudeMdPath)) { + systemPromptAppend = fs.readFileSync(claudeMdPath, 'utf-8'); + } + + for await (const message of query({ + prompt: stream, + options: { + cwd: '/workspace', + resume: sessionId, + resumeSessionAt: resumeAt, + systemPrompt: systemPromptAppend + ? { type: 'preset' as const, preset: 'claude_code' as const, append: systemPromptAppend } + : undefined, + allowedTools: [ + 'Bash', 'Read', 'Write', 'Edit', 'Glob', 'Grep', + 'WebSearch', 'WebFetch', + 'Task', 'TaskOutput', 'TaskStop', + 'TeamCreate', 'TeamDelete', 'SendMessage', + 'TodoWrite', 'NotebookEdit', + 'mcp__tracker__*', + ], + env: sdkEnv, + permissionMode: 'bypassPermissions', + allowDangerouslySkipPermissions: true, + mcpServers: { + tracker: { + command: 'node', + args: [mcpServerPath], + env: { + AGENT_SLUG: input.agentSlug, + AGENT_NAME: input.agentName, + }, + }, + }, + hooks: { + PreCompact: [{ hooks: [createPreCompactHook()] }], + PreToolUse: [{ matcher: 'Bash', hooks: [createSanitizeBashHook()] }], + }, + }, + })) { + if (message.type === 'system' && message.subtype === 'init') { + newSessionId = message.session_id; + log(`Session: ${newSessionId}`); + } + if (message.type === 'assistant' && 'uuid' in message) { + lastAssistantUuid = (message as { uuid: string }).uuid; + } + if (message.type === 'result') { + const text = 'result' in message ? (message as { result?: string }).result : null; + if (text) { + const clean = text.replace(/[\s\S]*?<\/internal>/g, '').trim(); + if (clean) { + writeOutput({ status: 'success', result: clean, newSessionId }); + } + } + } + } + + ipcPolling = false; + return { newSessionId, lastAssistantUuid, closedDuringQuery }; +} + +// --- Entry point --- + +async function main(): Promise { + let input: ContainerInput; + try { + const raw = await readStdin(); + input = JSON.parse(raw); + try { fs.unlinkSync('/tmp/input.json'); } catch {} + log(`Agent '${input.agentName}' (${input.agentSlug}) starting`); + } catch (err) { + writeOutput({ status: 'error', result: null, error: `Bad input: ${err}` }); + process.exit(1); + } + + // Build SDK env with secrets (never exposed to Bash) + const sdkEnv: Record = { ...process.env }; + for (const [key, value] of Object.entries(input.secrets || {})) { + sdkEnv[key] = value; + } + + const mcpServerPath = path.join( + path.dirname(new URL(import.meta.url).pathname), + 'mcp.js' + ); + + let sessionId = input.sessionId; + fs.mkdirSync(IPC_INPUT_DIR, { recursive: true }); + try { fs.unlinkSync(IPC_CLOSE_SENTINEL); } catch {} + + // Drain pending IPC + let prompt = input.prompt; + const pending = drainIpcInput(); + if (pending.length > 0) { + prompt += '\n' + pending.join('\n'); + } + + // Query loop: run → wait for IPC → run again (persistent session) + let resumeAt: string | undefined; + try { + while (true) { + log(`Query start (session: ${sessionId || 'new'})`); + const result = await runQuery(prompt, sessionId, input, sdkEnv, mcpServerPath, resumeAt); + + if (result.newSessionId) sessionId = result.newSessionId; + if (result.lastAssistantUuid) resumeAt = result.lastAssistantUuid; + if (result.closedDuringQuery) { log('Closed during query'); break; } + + // Session update marker + writeOutput({ status: 'success', result: null, newSessionId: sessionId }); + + log('Waiting for next message...'); + const next = await waitForIpcMessage(); + if (next === null) { log('Close signal received'); break; } + + log(`New message (${next.length} chars)`); + prompt = next; + } + } catch (err) { + writeOutput({ status: 'error', result: null, newSessionId: sessionId, error: String(err) }); + process.exit(1); + } +} + +main(); diff --git a/agent/src/ipc-mcp-stdio.ts b/agent/src/ipc-mcp-stdio.ts new file mode 100644 index 0000000..d894fe8 --- /dev/null +++ b/agent/src/ipc-mcp-stdio.ts @@ -0,0 +1,279 @@ +/** + * Stdio MCP Server for NanoClaw + * Standalone process that agent teams subagents can inherit. + * Reads context from environment variables, writes IPC files for the host. + */ + +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { z } from 'zod'; +import fs from 'fs'; +import path from 'path'; +import { CronExpressionParser } from 'cron-parser'; + +const IPC_DIR = '/workspace/ipc'; +const MESSAGES_DIR = path.join(IPC_DIR, 'messages'); +const TASKS_DIR = path.join(IPC_DIR, 'tasks'); + +// Context from environment variables (set by the agent runner) +const chatJid = process.env.NANOCLAW_CHAT_JID!; +const groupFolder = process.env.NANOCLAW_GROUP_FOLDER!; +const isMain = process.env.NANOCLAW_IS_MAIN === '1'; + +function writeIpcFile(dir: string, data: object): string { + fs.mkdirSync(dir, { recursive: true }); + + const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}.json`; + const filepath = path.join(dir, filename); + + // Atomic write: temp file then rename + const tempPath = `${filepath}.tmp`; + fs.writeFileSync(tempPath, JSON.stringify(data, null, 2)); + fs.renameSync(tempPath, filepath); + + return filename; +} + +const server = new McpServer({ + name: 'nanoclaw', + version: '1.0.0', +}); + +server.tool( + 'send_message', + "Send a message to the user or group immediately while you're still running. Use this for progress updates or to send multiple messages. You can call this multiple times. Note: when running as a scheduled task, your final output is NOT sent to the user — use this tool if you need to communicate with the user or group.", + { + text: z.string().describe('The message text to send'), + sender: z.string().optional().describe('Your role/identity name (e.g. "Researcher"). When set, messages appear from a dedicated bot in Telegram.'), + }, + async (args) => { + const data: Record = { + type: 'message', + chatJid, + text: args.text, + sender: args.sender || undefined, + groupFolder, + timestamp: new Date().toISOString(), + }; + + writeIpcFile(MESSAGES_DIR, data); + + return { content: [{ type: 'text' as const, text: 'Message sent.' }] }; + }, +); + +server.tool( + 'schedule_task', + `Schedule a recurring or one-time task. The task will run as a full agent with access to all tools. + +CONTEXT MODE - Choose based on task type: +\u2022 "group": Task runs in the group's conversation context, with access to chat history. Use for tasks that need context about ongoing discussions, user preferences, or recent interactions. +\u2022 "isolated": Task runs in a fresh session with no conversation history. Use for independent tasks that don't need prior context. When using isolated mode, include all necessary context in the prompt itself. + +If unsure which mode to use, you can ask the user. Examples: +- "Remind me about our discussion" \u2192 group (needs conversation context) +- "Check the weather every morning" \u2192 isolated (self-contained task) +- "Follow up on my request" \u2192 group (needs to know what was requested) +- "Generate a daily report" \u2192 isolated (just needs instructions in prompt) + +MESSAGING BEHAVIOR - The task agent's output is sent to the user or group. It can also use send_message for immediate delivery, or wrap output in tags to suppress it. Include guidance in the prompt about whether the agent should: +\u2022 Always send a message (e.g., reminders, daily briefings) +\u2022 Only send a message when there's something to report (e.g., "notify me if...") +\u2022 Never send a message (background maintenance tasks) + +SCHEDULE VALUE FORMAT (all times are LOCAL timezone): +\u2022 cron: Standard cron expression (e.g., "*/5 * * * *" for every 5 minutes, "0 9 * * *" for daily at 9am LOCAL time) +\u2022 interval: Milliseconds between runs (e.g., "300000" for 5 minutes, "3600000" for 1 hour) +\u2022 once: Local time WITHOUT "Z" suffix (e.g., "2026-02-01T15:30:00"). Do NOT use UTC/Z suffix.`, + { + prompt: z.string().describe('What the agent should do when the task runs. For isolated mode, include all necessary context here.'), + schedule_type: z.enum(['cron', 'interval', 'once']).describe('cron=recurring at specific times, interval=recurring every N ms, once=run once at specific time'), + schedule_value: z.string().describe('cron: "*/5 * * * *" | interval: milliseconds like "300000" | once: local timestamp like "2026-02-01T15:30:00" (no Z suffix!)'), + context_mode: z.enum(['group', 'isolated']).default('group').describe('group=runs with chat history and memory, isolated=fresh session (include context in prompt)'), + target_group_jid: z.string().optional().describe('(Main group only) JID of the group to schedule the task for. Defaults to the current group.'), + }, + async (args) => { + // Validate schedule_value before writing IPC + if (args.schedule_type === 'cron') { + try { + CronExpressionParser.parse(args.schedule_value); + } catch { + return { + content: [{ type: 'text' as const, text: `Invalid cron: "${args.schedule_value}". Use format like "0 9 * * *" (daily 9am) or "*/5 * * * *" (every 5 min).` }], + isError: true, + }; + } + } else if (args.schedule_type === 'interval') { + const ms = parseInt(args.schedule_value, 10); + if (isNaN(ms) || ms <= 0) { + return { + content: [{ type: 'text' as const, text: `Invalid interval: "${args.schedule_value}". Must be positive milliseconds (e.g., "300000" for 5 min).` }], + isError: true, + }; + } + } else if (args.schedule_type === 'once') { + const date = new Date(args.schedule_value); + if (isNaN(date.getTime())) { + return { + content: [{ type: 'text' as const, text: `Invalid timestamp: "${args.schedule_value}". Use ISO 8601 format like "2026-02-01T15:30:00.000Z".` }], + isError: true, + }; + } + } + + // Non-main groups can only schedule for themselves + const targetJid = isMain && args.target_group_jid ? args.target_group_jid : chatJid; + + const data = { + type: 'schedule_task', + prompt: args.prompt, + schedule_type: args.schedule_type, + schedule_value: args.schedule_value, + context_mode: args.context_mode || 'group', + targetJid, + createdBy: groupFolder, + timestamp: new Date().toISOString(), + }; + + const filename = writeIpcFile(TASKS_DIR, data); + + return { + content: [{ type: 'text' as const, text: `Task scheduled (${filename}): ${args.schedule_type} - ${args.schedule_value}` }], + }; + }, +); + +server.tool( + 'list_tasks', + "List all scheduled tasks. From main: shows all tasks. From other groups: shows only that group's tasks.", + {}, + async () => { + const tasksFile = path.join(IPC_DIR, 'current_tasks.json'); + + try { + if (!fs.existsSync(tasksFile)) { + return { content: [{ type: 'text' as const, text: 'No scheduled tasks found.' }] }; + } + + const allTasks = JSON.parse(fs.readFileSync(tasksFile, 'utf-8')); + + const tasks = isMain + ? allTasks + : allTasks.filter((t: { groupFolder: string }) => t.groupFolder === groupFolder); + + if (tasks.length === 0) { + return { content: [{ type: 'text' as const, text: 'No scheduled tasks found.' }] }; + } + + const formatted = tasks + .map( + (t: { id: string; prompt: string; schedule_type: string; schedule_value: string; status: string; next_run: string }) => + `- [${t.id}] ${t.prompt.slice(0, 50)}... (${t.schedule_type}: ${t.schedule_value}) - ${t.status}, next: ${t.next_run || 'N/A'}`, + ) + .join('\n'); + + return { content: [{ type: 'text' as const, text: `Scheduled tasks:\n${formatted}` }] }; + } catch (err) { + return { + content: [{ type: 'text' as const, text: `Error reading tasks: ${err instanceof Error ? err.message : String(err)}` }], + }; + } + }, +); + +server.tool( + 'pause_task', + 'Pause a scheduled task. It will not run until resumed.', + { task_id: z.string().describe('The task ID to pause') }, + async (args) => { + const data = { + type: 'pause_task', + taskId: args.task_id, + groupFolder, + isMain, + timestamp: new Date().toISOString(), + }; + + writeIpcFile(TASKS_DIR, data); + + return { content: [{ type: 'text' as const, text: `Task ${args.task_id} pause requested.` }] }; + }, +); + +server.tool( + 'resume_task', + 'Resume a paused task.', + { task_id: z.string().describe('The task ID to resume') }, + async (args) => { + const data = { + type: 'resume_task', + taskId: args.task_id, + groupFolder, + isMain, + timestamp: new Date().toISOString(), + }; + + writeIpcFile(TASKS_DIR, data); + + return { content: [{ type: 'text' as const, text: `Task ${args.task_id} resume requested.` }] }; + }, +); + +server.tool( + 'cancel_task', + 'Cancel and delete a scheduled task.', + { task_id: z.string().describe('The task ID to cancel') }, + async (args) => { + const data = { + type: 'cancel_task', + taskId: args.task_id, + groupFolder, + isMain, + timestamp: new Date().toISOString(), + }; + + writeIpcFile(TASKS_DIR, data); + + return { content: [{ type: 'text' as const, text: `Task ${args.task_id} cancellation requested.` }] }; + }, +); + +server.tool( + 'register_group', + `Register a new WhatsApp group so the agent can respond to messages there. Main group only. + +Use available_groups.json to find the JID for a group. The folder name should be lowercase with hyphens (e.g., "family-chat").`, + { + jid: z.string().describe('The WhatsApp JID (e.g., "120363336345536173@g.us")'), + name: z.string().describe('Display name for the group'), + folder: z.string().describe('Folder name for group files (lowercase, hyphens, e.g., "family-chat")'), + trigger: z.string().describe('Trigger word (e.g., "@Andy")'), + }, + async (args) => { + if (!isMain) { + return { + content: [{ type: 'text' as const, text: 'Only the main group can register new groups.' }], + isError: true, + }; + } + + const data = { + type: 'register_group', + jid: args.jid, + name: args.name, + folder: args.folder, + trigger: args.trigger, + timestamp: new Date().toISOString(), + }; + + writeIpcFile(TASKS_DIR, data); + + return { + content: [{ type: 'text' as const, text: `Group "${args.name}" registered. It will start receiving messages immediately.` }], + }; + }, +); + +// Start the stdio transport +const transport = new StdioServerTransport(); +await server.connect(transport); diff --git a/agent/src/mcp.ts b/agent/src/mcp.ts new file mode 100644 index 0000000..c520346 --- /dev/null +++ b/agent/src/mcp.ts @@ -0,0 +1,146 @@ +/** + * MCP Server for Team Board Agent + * Provides tools for agent to interact with Tracker: + * - send_message: send chat message via IPC + * - update_task: update task status + * - create_comment: comment on a task + * + * Runs as a child process of the agent, communicates via stdio. + * Uses file-based IPC to communicate with the host runner. + */ + +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { z } from 'zod'; +import fs from 'fs'; +import path from 'path'; + +const IPC_DIR = '/workspace/ipc'; +const MESSAGES_DIR = path.join(IPC_DIR, 'messages'); +const TASKS_DIR = path.join(IPC_DIR, 'tasks'); + +const agentSlug = process.env.AGENT_SLUG || 'unknown'; +const agentName = process.env.AGENT_NAME || 'Agent'; + +function writeIpcFile(dir: string, data: object): string { + fs.mkdirSync(dir, { recursive: true }); + const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}.json`; + const filepath = path.join(dir, filename); + const tmpPath = `${filepath}.tmp`; + fs.writeFileSync(tmpPath, JSON.stringify(data, null, 2)); + fs.renameSync(tmpPath, filepath); + return filename; +} + +const server = new McpServer({ + name: 'tracker', + version: '1.0.0', +}); + +// --- Chat --- + +server.tool( + 'send_message', + 'Send a message to a chat room (lobby, project, or task chat). Use for progress updates or responses.', + { + chat_id: z.string().describe('Chat ID to send the message to'), + text: z.string().describe('Message text'), + }, + async (args) => { + writeIpcFile(MESSAGES_DIR, { + type: 'chat_message', + chat_id: args.chat_id, + content: args.text, + sender_type: 'agent', + sender_name: agentName, + sender_slug: agentSlug, + timestamp: new Date().toISOString(), + }); + return { content: [{ type: 'text' as const, text: 'Message sent.' }] }; + }, +); + +// --- Tasks --- + +server.tool( + 'update_task_status', + 'Move a task to a different status (backlog, todo, in_progress, in_review, done)', + { + task_id: z.string().describe('Task ID'), + status: z.enum(['backlog', 'todo', 'in_progress', 'in_review', 'done']).describe('New status'), + comment: z.string().optional().describe('Optional comment about the status change'), + }, + async (args) => { + writeIpcFile(TASKS_DIR, { + type: 'task_status', + task_id: args.task_id, + status: args.status, + comment: args.comment, + agent_slug: agentSlug, + timestamp: new Date().toISOString(), + }); + return { content: [{ type: 'text' as const, text: `Task ${args.task_id} → ${args.status}` }] }; + }, +); + +server.tool( + 'add_task_comment', + 'Add a comment to a task (for progress updates, questions, or results)', + { + task_id: z.string().describe('Task ID'), + comment: z.string().describe('Comment text'), + }, + async (args) => { + writeIpcFile(TASKS_DIR, { + type: 'task_comment', + task_id: args.task_id, + content: args.comment, + sender_type: 'agent', + sender_name: agentName, + agent_slug: agentSlug, + timestamp: new Date().toISOString(), + }); + return { content: [{ type: 'text' as const, text: 'Comment added.' }] }; + }, +); + +server.tool( + 'take_task', + 'Take a task and start working on it. Changes status to in_progress.', + { + task_id: z.string().describe('Task ID to take'), + }, + async (args) => { + writeIpcFile(TASKS_DIR, { + type: 'task_take', + task_id: args.task_id, + agent_slug: agentSlug, + timestamp: new Date().toISOString(), + }); + return { content: [{ type: 'text' as const, text: `Task ${args.task_id} taken.` }] }; + }, +); + +server.tool( + 'complete_task', + 'Mark a task as done.', + { + task_id: z.string().describe('Task ID'), + summary: z.string().optional().describe('Completion summary'), + }, + async (args) => { + writeIpcFile(TASKS_DIR, { + type: 'task_complete', + task_id: args.task_id, + summary: args.summary, + agent_slug: agentSlug, + timestamp: new Date().toISOString(), + }); + return { content: [{ type: 'text' as const, text: `Task ${args.task_id} completed.` }] }; + }, +); + +// --- Start --- + +const transport = new StdioServerTransport(); +await server.connect(transport); diff --git a/agent/tsconfig.json b/agent/tsconfig.json new file mode 100644 index 0000000..b7374c7 --- /dev/null +++ b/agent/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "resolveJsonModule": true + }, + "include": ["src"] +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6ecf620 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +httpx>=0.27 diff --git a/runner.py b/runner.py new file mode 100644 index 0000000..7a451fd --- /dev/null +++ b/runner.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 +""" +Team Board Agent Runner (host-side) + +Manages agent containers: +- Starts Docker container with Claude Agent SDK +- Sends prompts via stdin +- Reads results from stdout (marker-based protocol) +- Handles IPC files for chat/task operations +- Forwards IPC to Tracker API + +Future: WebSocket connection to Tracker for real-time events. +Currently: CLI-based for testing. +""" + +import asyncio +import json +import logging +import os +import signal +import subprocess +import sys +import time +from pathlib import Path + +import httpx + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) +logger = logging.getLogger("runner") + +# --- Config --- + +TRACKER_URL = os.getenv("TRACKER_URL", "http://localhost:8100") +TRACKER_TOKEN = os.getenv("TRACKER_TOKEN", "tb-tracker-dev-token") +DATA_DIR = Path(os.getenv("RUNNER_DATA_DIR", "/opt/team-board-runner/data")) +DOCKER_IMAGE = os.getenv("RUNNER_IMAGE", "team-board-agent:latest") +IDLE_TIMEOUT = int(os.getenv("RUNNER_IDLE_TIMEOUT", "300")) # 5 min + +OUTPUT_START = "---AGENT_OUTPUT_START---" +OUTPUT_END = "---AGENT_OUTPUT_END---" + + +class AgentContainer: + """Manages a single agent container.""" + + def __init__(self, slug: str, name: str, secrets: dict): + self.slug = slug + self.name = name + self.secrets = secrets + self.session_id: str | None = None + self.process: subprocess.Popen | None = None + self.workspace = DATA_DIR / "agents" / slug + self.ipc_dir = self.workspace / "ipc" + + def ensure_dirs(self): + """Create workspace directories.""" + for d in ["ipc/input", "ipc/messages", "ipc/tasks", "conversations"]: + (self.workspace / d).mkdir(parents=True, exist_ok=True) + + def start(self, prompt: str) -> None: + """Start container with initial prompt.""" + self.ensure_dirs() + + container_name = f"tb-agent-{self.slug}-{int(time.time())}" + + cmd = [ + "docker", "run", "-i", "--rm", + "--name", container_name, + "-v", f"{self.workspace}:/workspace", + DOCKER_IMAGE, + ] + + logger.info("Starting container %s for agent '%s'", container_name, self.name) + + self.process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + # Send initial input + input_data = json.dumps({ + "prompt": prompt, + "sessionId": self.session_id, + "agentSlug": self.slug, + "agentName": self.name, + "secrets": self.secrets, + }) + + self.process.stdin.write(input_data) + self.process.stdin.close() + + def send_message(self, text: str) -> None: + """Send follow-up message via IPC file.""" + ipc_input = self.ipc_dir / "input" + ipc_input.mkdir(parents=True, exist_ok=True) + + filename = f"{int(time.time() * 1000)}-{os.urandom(4).hex()}.json" + tmp_path = ipc_input / f"{filename}.tmp" + final_path = ipc_input / filename + + tmp_path.write_text(json.dumps({"type": "message", "text": text})) + tmp_path.rename(final_path) + logger.info("IPC message sent to %s (%d chars)", self.slug, len(text)) + + def close(self) -> None: + """Send close signal.""" + sentinel = self.ipc_dir / "input" / "_close" + sentinel.touch() + logger.info("Close signal sent to %s", self.slug) + + def read_outputs(self) -> list[dict]: + """Read all available outputs from stdout (non-blocking-ish).""" + if not self.process or not self.process.stdout: + return [] + + outputs = [] + buffer = "" + + while True: + line = self.process.stdout.readline() + if not line: + break + + buffer += line + + if OUTPUT_START in buffer: + start_idx = buffer.index(OUTPUT_START) + if OUTPUT_END in buffer[start_idx:]: + end_idx = buffer.index(OUTPUT_END, start_idx) + json_str = buffer[start_idx + len(OUTPUT_START):end_idx].strip() + try: + output = json.loads(json_str) + outputs.append(output) + if output.get("newSessionId"): + self.session_id = output["newSessionId"] + except json.JSONDecodeError as e: + logger.error("Failed to parse output: %s", e) + buffer = buffer[end_idx + len(OUTPUT_END):] + + return outputs + + def is_running(self) -> bool: + if not self.process: + return False + return self.process.poll() is None + + +class IpcProcessor: + """Processes IPC files from agent containers and forwards to Tracker.""" + + def __init__(self, tracker_url: str, tracker_token: str): + self.tracker_url = tracker_url + self.tracker_token = tracker_token + + async def process_agent_ipc(self, agent: AgentContainer): + """Process all pending IPC files for an agent.""" + await self._process_messages(agent) + await self._process_tasks(agent) + + async def _process_messages(self, agent: AgentContainer): + msg_dir = agent.ipc_dir / "messages" + if not msg_dir.exists(): + return + + for f in sorted(msg_dir.glob("*.json")): + try: + data = json.loads(f.read_text()) + f.unlink() + + if data.get("type") == "chat_message": + await self._send_chat_message(data) + except Exception as e: + logger.error("IPC message error: %s", e) + f.unlink(missing_ok=True) + + async def _process_tasks(self, agent: AgentContainer): + task_dir = agent.ipc_dir / "tasks" + if not task_dir.exists(): + return + + for f in sorted(task_dir.glob("*.json")): + try: + data = json.loads(f.read_text()) + f.unlink() + + ipc_type = data.get("type") + if ipc_type == "task_status": + await self._update_task_status(data) + elif ipc_type == "task_comment": + await self._add_task_comment(data) + elif ipc_type == "task_take": + await self._take_task(data) + elif ipc_type == "task_complete": + await self._complete_task(data) + except Exception as e: + logger.error("IPC task error: %s", e) + f.unlink(missing_ok=True) + + async def _send_chat_message(self, data: dict): + """Forward chat message to Tracker REST API.""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{self.tracker_url}/api/v1/chats/{data['chat_id']}/messages", + headers={"Authorization": f"Bearer {self.tracker_token}"}, + json={ + "sender_type": data.get("sender_type", "agent"), + "sender_name": data.get("sender_name", "Agent"), + "content": data["content"], + }, + ) + if resp.status_code < 300: + logger.info("Chat message forwarded to %s", data["chat_id"]) + else: + logger.error("Chat forward failed: %s %s", resp.status_code, resp.text) + + async def _update_task_status(self, data: dict): + async with httpx.AsyncClient() as client: + await client.patch( + f"{self.tracker_url}/api/v1/tasks/{data['task_id']}", + headers={"Authorization": f"Bearer {self.tracker_token}"}, + json={"status": data["status"]}, + ) + logger.info("Task %s → %s", data["task_id"], data["status"]) + + async def _add_task_comment(self, data: dict): + logger.info("Task comment: %s (TODO: implement endpoint)", data["task_id"]) + + async def _take_task(self, data: dict): + async with httpx.AsyncClient() as client: + await client.patch( + f"{self.tracker_url}/api/v1/tasks/{data['task_id']}", + headers={"Authorization": f"Bearer {self.tracker_token}"}, + json={"status": "in_progress", "assigned_to": data["agent_slug"]}, + ) + logger.info("Task %s taken by %s", data["task_id"], data["agent_slug"]) + + async def _complete_task(self, data: dict): + async with httpx.AsyncClient() as client: + await client.patch( + f"{self.tracker_url}/api/v1/tasks/{data['task_id']}", + headers={"Authorization": f"Bearer {self.tracker_token}"}, + json={"status": "done"}, + ) + logger.info("Task %s completed", data["task_id"]) + + +# --- CLI Interface (for testing) --- + +async def interactive_session(agent_slug: str, agent_name: str): + """Run an interactive chat session with an agent.""" + secrets = {} + oauth_token = os.getenv("CLAUDE_CODE_OAUTH_TOKEN", "") + api_key = os.getenv("ANTHROPIC_API_KEY", "") + if oauth_token: + secrets["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token + if api_key: + secrets["ANTHROPIC_API_KEY"] = api_key + + if not secrets: + logger.error("Set CLAUDE_CODE_OAUTH_TOKEN or ANTHROPIC_API_KEY") + sys.exit(1) + + agent = AgentContainer(agent_slug, agent_name, secrets) + ipc = IpcProcessor(TRACKER_URL, TRACKER_TOKEN) + + print(f"\n🤖 Agent '{agent_name}' ({agent_slug})") + print("Type your messages. Ctrl+C to exit.\n") + + prompt = input("You: ").strip() + if not prompt: + return + + agent.start(prompt) + + # Read loop + try: + while agent.is_running(): + outputs = agent.read_outputs() + for out in outputs: + if out.get("result"): + print(f"\n🤖 {agent_name}: {out['result']}\n") + + # Process IPC + await ipc.process_agent_ipc(agent) + + if not agent.is_running(): + break + + await asyncio.sleep(0.5) + except KeyboardInterrupt: + agent.close() + print("\nSession ended.") + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Team Board Agent Runner") + parser.add_argument("--slug", default="coder", help="Agent slug") + parser.add_argument("--name", default="Кодер", help="Agent name") + args = parser.parse_args() + + asyncio.run(interactive_session(args.slug, args.name)) + + +if __name__ == "__main__": + main()