This commit is contained in:
Eugene 2026-02-23 15:48:14 +03:00
parent d04d9e0d3a
commit 368b9abf69
6 changed files with 346 additions and 129 deletions

3
.env
View File

@ -1,2 +1,5 @@
ANTHROPIC_API_KEY=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 ANTHROPIC_API_KEY=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
LOG_LEVEL=debug LOG_LEVEL=debug
; tb--HfzlIbf8h0Bhz9Dkzeb2at-y6Ag0mZh8_P0GeWqBBo

View File

@ -7,42 +7,45 @@
"slug": "coder", "slug": "coder",
"_slug_comment": "Уникальный идентификатор агента (латиница, без пробелов). Env: AGENT_SLUG", "_slug_comment": "Уникальный идентификатор агента (латиница, без пробелов). Env: AGENT_SLUG",
"prompt": "Ты опытный Go-разработчик. Пишешь чистый, идиоматичный Go-код.", "prompt": "Ты опытный разработчик. Пишешь чистый, идиоматичный код. Анализируешь задачу, пишешь код, отчитываешься.",
"_prompt_comment": "Системный промпт — описание роли и компетенций агента. Env: AGENT_PROMPT", "_prompt_comment": "Системный промпт — описание роли и компетенций агента. Env: AGENT_PROMPT",
"tracker_url": "http://localhost:8100", "tracker_url": "http://localhost:8100",
"_tracker_url_comment": "URL Team Board Tracker для регистрации и получения задач. Env: TRACKER_URL", "_tracker_url_comment": "REST API URL. Локально: http://localhost:8100. Через nginx: https://dev.team.uix.su/agent-api. Env: TRACKER_URL",
"token": "tb-agent-abc123", "ws_url": "",
"_token_comment": "Токен авторизации для Tracker API (Bearer token). Env: AGENT_TOKEN", "_ws_url_comment": "WebSocket URL (только для transport=ws). Локально: ws://localhost:8100/ws. Через nginx: wss://dev.team.uix.su/agent-ws. Пустой = авто из tracker_url. Env: AGENT_WS_URL",
"transport": "http", "token": "tb-agent-xxxxxxxx",
"_transport_comment": "Транспорт для связи с трекером: 'http' (agent слушает HTTP, трекер шлёт POST) или 'ws' (agent подключается к трекеру по WebSocket). Env: AGENT_TRANSPORT. Default: http", "_token_comment": "Токен агента (генерируется в Tracker UI или через POST /api/v1/agents/register). Env: AGENT_TOKEN",
"transport": "ws",
"_transport_comment": "Транспорт: 'ws' (WebSocket, рекомендуется) или 'http' (callback). Env: AGENT_TRANSPORT",
"listen_port": 3200, "listen_port": 3200,
"_listen_port_comment": "Порт HTTP-сервера для приёма событий от трекера (только для transport=http). Env: AGENT_PORT. Default: 3200", "_listen_port_comment": "Порт для transport=http (не нужен при ws). Env: AGENT_PORT",
"work_dir": "/projects/my-app", "work_dir": ".",
"_work_dir_comment": "Рабочая директория агента (где он выполняет задачи). Env: PICOGENT_WORK_DIR. Default: agentHome или cwd", "_work_dir_comment": "Рабочая директория (где агент выполняет код). Env: PICOGENT_WORK_DIR",
"model": "sonnet", "model": "sonnet",
"_model_comment": "Модель LLM. Алиасы: sonnet, opus, haiku, sonnet-4, opus-4. Полные ID тоже работают. Env: PICOGENT_MODEL. Default: sonnet", "_model_comment": "Модель: sonnet, opus, haiku, sonnet-4, opus-4, или полный ID. Env: PICOGENT_MODEL",
"provider": "anthropic", "provider": "anthropic",
"_provider_comment": "Провайдер LLM. Default: anthropic. Env: PICOGENT_PROVIDER", "_provider_comment": "Провайдер LLM. Env: PICOGENT_PROVIDER",
"api_key": "", "api_key": "",
"_api_key_comment": "API ключ провайдера. Лучше через .env файл. Env: PICOGENT_API_KEY или ANTHROPIC_API_KEY", "_api_key_comment": "API ключ. Лучше через .env: ANTHROPIC_API_KEY=sk-ant-... Env: PICOGENT_API_KEY",
"capabilities": ["coding", "review"], "capabilities": ["coding", "review"],
"_capabilities_comment": "Список возможностей агента — передаётся трекеру при регистрации. Default: [\"coding\"]", "_capabilities_comment": "Возможности агента (передаются трекеру). Примеры: coding, review, testing, docs",
"max_concurrent_tasks": 2, "max_concurrent_tasks": 2,
"_max_concurrent_tasks_comment": "Сколько задач агент может выполнять параллельно. Default: 2", "_max_concurrent_tasks_comment": "Сколько задач агент может выполнять параллельно",
"heartbeat_interval_sec": 30, "heartbeat_interval_sec": 30,
"_heartbeat_interval_sec_comment": "Интервал heartbeat к трекеру (секунды). Default: 30", "_heartbeat_interval_sec_comment": "Интервал heartbeat (сек). Timeout трекера: 90с",
"allowed_paths": [], "allowed_paths": [],
"_allowed_paths_comment": "Ограничение доступа к файлам. Пустой массив = без ограничений. Пример: [\"/projects/my-app/src\", \"/projects/my-app/tests\"]" "_allowed_paths_comment": "Ограничение файлового доступа. [] = без ограничений. Пример: [\"/projects/my-app\"]"
} }

View File

@ -16,7 +16,10 @@ export interface AgentConfig {
name: string; name: string;
slug: string; slug: string;
prompt: string; prompt: string;
/** REST API base URL (e.g. https://dev.team.uix.su/agent-api) */
trackerUrl: string; trackerUrl: string;
/** WebSocket URL (e.g. wss://dev.team.uix.su/agent-ws). Falls back to trackerUrl with http→ws conversion. */
wsUrl: string;
token: string; token: string;
transport: 'http' | 'ws'; transport: 'http' | 'ws';
listenPort: number; listenPort: number;
@ -136,11 +139,14 @@ export function loadAgentConfig(): AgentConfig {
|| resolvedHome || resolvedHome
|| process.cwd(); || process.cwd();
const wsUrl = process.env.AGENT_WS_URL || (file.ws_url as string) || '';
return { return {
name: (file.name as string) || process.env.AGENT_NAME || 'Agent', name: (file.name as string) || process.env.AGENT_NAME || 'Agent',
slug: (file.slug as string) || process.env.AGENT_SLUG || 'agent', slug: (file.slug as string) || process.env.AGENT_SLUG || 'agent',
prompt: (file.prompt as string) || process.env.AGENT_PROMPT || '', prompt: (file.prompt as string) || process.env.AGENT_PROMPT || '',
trackerUrl, trackerUrl,
wsUrl,
token, token,
transport: (process.env.AGENT_TRANSPORT || (file.transport as string) || 'http') as 'http' | 'ws', transport: (process.env.AGENT_TRANSPORT || (file.transport as string) || 'http') as 'http' | 'ws',
listenPort: parseInt(process.env.AGENT_PORT || String(file.listen_port || '3200'), 10), listenPort: parseInt(process.env.AGENT_PORT || String(file.listen_port || '3200'), 10),

View File

@ -21,14 +21,24 @@ export class EventRouter {
) {} ) {}
async handleEvent(event: TrackerEvent): Promise<void> { async handleEvent(event: TrackerEvent): Promise<void> {
this.log.info({ event: event.event, id: event.id }, 'Handling event'); this.log.info('┌── ROUTER: handling %s (id: %s)', event.event, event.id);
switch (event.event) { switch (event.event) {
case 'task.assigned': case 'task.assigned':
await this.handleTaskAssigned(event.data); await this.handleTaskAssigned(event.data);
break; break;
case 'message.new':
await this.handleMessageNew(event.data);
break;
case 'chat.message': case 'chat.message':
await this.handleChatMessage(event.data); await this.handleMessageNew(event.data);
break;
case 'task.created':
case 'task.updated':
case 'agent.status':
case 'agent.online':
case 'agent.offline':
this.log.info({ event: event.event, data: event.data }, 'Informational event');
break; break;
default: default:
this.log.warn({ event: event.event }, 'Unknown event type, ignoring'); this.log.warn({ event: event.event }, 'Unknown event type, ignoring');
@ -36,7 +46,8 @@ export class EventRouter {
} }
private async handleTaskAssigned(data: Record<string, unknown>): Promise<void> { private async handleTaskAssigned(data: Record<string, unknown>): Promise<void> {
const task = data.task as TrackerTask; // Protocol: data = { task: TaskOut } or data IS the task
const task = (data.task as TrackerTask) || (data as unknown as TrackerTask);
if (!task?.id) { if (!task?.id) {
this.log.error({ data }, 'task.assigned event missing task data'); this.log.error({ data }, 'task.assigned event missing task data');
return; return;
@ -49,10 +60,13 @@ export class EventRouter {
this.activeTasks++; this.activeTasks++;
this.taskTracker.addTask(task.id); this.taskTracker.addTask(task.id);
this.log.info({ taskId: task.id, key: task.key, title: task.title }, 'Processing task'); this.log.info('│ TASK ASSIGNED: %s — %s', task.key, task.title);
this.log.info('│ Priority: %s | Status: %s', task.priority || '-', task.status || '-');
if (task.description) this.log.info('│ Description: %s', task.description.slice(0, 200));
try { try {
// Update status → in_progress // Update status → in_progress
this.log.info('│ → Updating task status to in_progress...');
await this.client.updateTask(task.id, { status: 'in_progress' }).catch((err) => { await this.client.updateTask(task.id, { status: 'in_progress' }).catch((err) => {
this.log.warn({ err, taskId: task.id }, 'Failed to update task status to in_progress'); this.log.warn({ err, taskId: task.id }, 'Failed to update task status to in_progress');
}); });
@ -78,40 +92,58 @@ export class EventRouter {
} }
} }
// Post result as comment // Post result as comment to task
if (collectedText.trim()) { if (collectedText.trim()) {
await this.client.addComment(task.id, collectedText.trim()).catch((err) => { this.log.info('│ → Sending result comment (%d chars)...', collectedText.trim().length);
this.log.info('│ Result preview: %s', collectedText.trim().slice(0, 300));
await this.client.sendMessage({ task_id: task.id, content: collectedText.trim() }).catch((err) => {
this.log.error({ err, taskId: task.id }, 'Failed to add comment'); this.log.error({ err, taskId: task.id }, 'Failed to add comment');
}); });
} }
// Update status → review // Update status → in_review
await this.client.updateTask(task.id, { status: 'review' }).catch((err) => { this.log.info('│ → Updating task status to in_review...');
this.log.warn({ err, taskId: task.id }, 'Failed to update task status to review'); await this.client.updateTask(task.id, { status: 'in_review' }).catch((err) => {
this.log.warn({ err, taskId: task.id }, 'Failed to update task status to in_review');
}); });
this.log.info({ taskId: task.id, resultLength: collectedText.length }, 'Task completed'); this.log.info('└── TASK DONE: %s (%d chars output)', task.key, collectedText.length);
} catch (err) { } catch (err) {
this.log.error({ err, taskId: task.id }, 'Task processing failed'); this.log.error({ err, taskId: task.id }, 'Task processing failed');
await this.client.addComment(task.id, `Agent error: ${err instanceof Error ? err.message : String(err)}`).catch(() => {}); await this.client.sendMessage({
await this.client.updateTask(task.id, { status: 'error' }).catch(() => {}); task_id: task.id,
content: `Agent error: ${err instanceof Error ? err.message : String(err)}`,
}).catch(() => {});
} finally { } finally {
this.activeTasks--; this.activeTasks--;
this.taskTracker.removeTask(task.id); this.taskTracker.removeTask(task.id);
} }
} }
private async handleChatMessage(data: Record<string, unknown>): Promise<void> { private async handleMessageNew(data: Record<string, unknown>): Promise<void> {
const content = (data.content as string) || (data.message as string) || ''; // Protocol: message.new → { id, chat_id, task_id, author_slug, content, mentions, ... }
const content = (data.content as string) || '';
const authorSlug = (data.author_slug as string) || (data.sender_slug as string) || '';
const taskId = data.task_id as string | undefined; const taskId = data.task_id as string | undefined;
const chatId = data.chat_id as string | undefined;
const mentions = (data.mentions as string[]) || [];
if (!content) { // Don't respond to own messages
this.log.warn({ data }, 'chat.message event missing content'); if (authorSlug === this.config.slug) {
this.log.debug('Ignoring own message');
return; return;
} }
this.log.info({ taskId, contentLength: content.length }, 'Processing chat message'); if (!content) {
this.log.warn({ data }, 'message.new event missing content');
return;
}
// Check if agent is mentioned (for filtered modes)
const isMentioned = mentions.includes(this.config.slug);
this.log.info('│ MESSAGE from @%s: "%s"', authorSlug, content.slice(0, 200));
this.log.info('│ Context: %s | Mentioned: %s', taskId ? `task=${taskId}` : chatId ? `chat=${chatId}` : 'none', isMentioned);
let collectedText = ''; let collectedText = '';
for await (const msg of runAgent(content, { for await (const msg of runAgent(content, {
@ -128,10 +160,21 @@ export class EventRouter {
} }
} }
if (taskId && collectedText.trim()) { // Reply to the same context (task comment or chat message)
await this.client.addComment(taskId, collectedText.trim()).catch((err) => { if (collectedText.trim()) {
this.log.error({ err, taskId }, 'Failed to add chat reply comment'); this.log.info('│ → Sending reply (%d chars): %s', collectedText.trim().length, collectedText.trim().slice(0, 200));
}); if (taskId) {
await this.client.sendMessage({ task_id: taskId, content: collectedText.trim() }).catch((err) => {
this.log.error({ err, taskId }, 'Failed to send task comment reply');
});
} else if (chatId) {
await this.client.sendMessage({ chat_id: chatId, content: collectedText.trim() }).catch((err) => {
this.log.error({ err, chatId }, 'Failed to send chat reply');
});
}
this.log.info('└── MESSAGE REPLIED');
} else {
this.log.info('└── MESSAGE PROCESSED (no reply)');
} }
} }
} }

View File

@ -1,6 +1,14 @@
import { logger } from '../logger.js'; import { logger } from '../logger.js';
import type { RegistrationPayload, HeartbeatPayload } from './types.js'; import type { RegistrationPayload, HeartbeatPayload } from './types.js';
/**
* HTTP client to Tracker REST API.
*
* REST API base: {baseUrl}/api/v1
* Auth: Bearer token in header.
*
* Used in both WS and HTTP transports for mutations.
*/
export class TrackerClient { export class TrackerClient {
private log = logger.child({ component: 'tracker-client' }); private log = logger.child({ component: 'tracker-client' });
@ -11,6 +19,8 @@ export class TrackerClient {
private async request<T>(method: string, path: string, body?: unknown): Promise<T> { private async request<T>(method: string, path: string, body?: unknown): Promise<T> {
const url = `${this.baseUrl}${path}`; const url = `${this.baseUrl}${path}`;
this.log.info(' REST %s %s', method, path);
if (body) this.log.info(' body: %s', JSON.stringify(body).slice(0, 300));
const res = await fetch(url, { const res = await fetch(url, {
method, method,
headers: { headers: {
@ -21,8 +31,10 @@ export class TrackerClient {
}); });
if (!res.ok) { if (!res.ok) {
const text = await res.text().catch(() => ''); const text = await res.text().catch(() => '');
this.log.error(' REST FAIL %s %s → %d: %s', method, path, res.status, text.slice(0, 200));
throw new Error(`Tracker API ${method} ${path} failed: ${res.status} ${text}`); throw new Error(`Tracker API ${method} ${path} failed: ${res.status} ${text}`);
} }
this.log.info(' REST OK %s %s → %d', method, path, res.status);
const contentType = res.headers.get('content-type') || ''; const contentType = res.headers.get('content-type') || '';
if (contentType.includes('application/json')) { if (contentType.includes('application/json')) {
return (await res.json()) as T; return (await res.json()) as T;
@ -30,6 +42,8 @@ export class TrackerClient {
return undefined as T; return undefined as T;
} }
// --- Agent lifecycle ---
async register(payload: RegistrationPayload): Promise<void> { async register(payload: RegistrationPayload): Promise<void> {
this.log.info({ slug: payload.slug }, 'Registering agent'); this.log.info({ slug: payload.slug }, 'Registering agent');
await this.request('POST', '/api/v1/agents/register', payload); await this.request('POST', '/api/v1/agents/register', payload);
@ -39,25 +53,90 @@ export class TrackerClient {
await this.request('POST', '/api/v1/agents/heartbeat', payload); await this.request('POST', '/api/v1/agents/heartbeat', payload);
} }
async updateTask(taskId: string, fields: Record<string, unknown>): Promise<void> { // --- Tasks ---
this.log.info({ taskId, fields }, 'Updating task');
await this.request('PATCH', `/api/v1/tasks/${taskId}`, fields);
}
async addComment(taskId: string, content: string): Promise<void> { async listTasks(params: Record<string, string> = {}): Promise<Record<string, unknown>[]> {
this.log.info({ taskId, contentLength: content.length }, 'Adding comment'); const qs = new URLSearchParams(params).toString();
await this.request('POST', `/api/v1/tasks/${taskId}/comments`, { content }); const path = qs ? `/api/v1/tasks?${qs}` : '/api/v1/tasks';
} return this.request('GET', path);
async uploadFile(taskId: string, filename: string, content: string): Promise<void> {
await this.request('POST', `/api/v1/tasks/${taskId}/files`, { filename, content });
} }
async getTask(taskId: string): Promise<Record<string, unknown>> { async getTask(taskId: string): Promise<Record<string, unknown>> {
return this.request('GET', `/api/v1/tasks/${taskId}`); return this.request('GET', `/api/v1/tasks/${taskId}`);
} }
async createTask(projectSlug: string, task: Record<string, unknown>): Promise<Record<string, unknown>> {
return this.request('POST', `/api/v1/tasks?project_slug=${encodeURIComponent(projectSlug)}`, task);
}
async updateTask(taskId: string, fields: Record<string, unknown>): Promise<void> {
this.log.info({ taskId, fields }, 'Updating task');
await this.request('PATCH', `/api/v1/tasks/${taskId}`, fields);
}
async deleteTask(taskId: string): Promise<void> {
await this.request('DELETE', `/api/v1/tasks/${taskId}`);
}
async takeTask(taskId: string, slug: string): Promise<void> {
this.log.info({ taskId, slug }, 'Taking task');
await this.request('POST', `/api/v1/tasks/${taskId}/take?slug=${encodeURIComponent(slug)}`);
}
async rejectTask(taskId: string, slug: string, reason?: string): Promise<void> {
this.log.info({ taskId, slug, reason }, 'Rejecting task');
await this.request('POST', `/api/v1/tasks/${taskId}/reject`, { slug, reason });
}
async watchTask(taskId: string, slug: string): Promise<void> {
await this.request('POST', `/api/v1/tasks/${taskId}/watch?slug=${encodeURIComponent(slug)}`);
}
// --- Steps (checklist inside task) ---
async addStep(taskId: string, title: string): Promise<Record<string, unknown>> {
return this.request('POST', `/api/v1/tasks/${taskId}/steps`, { title });
}
async completeStep(taskId: string, stepId: string): Promise<void> {
await this.request('PATCH', `/api/v1/tasks/${taskId}/steps/${stepId}`, { done: true });
}
// --- Messages (unified: chat + task comments) ---
async sendMessage(payload: { chat_id?: string; task_id?: string; content: string; mentions?: string[] }): Promise<Record<string, unknown>> {
this.log.info({ chatId: payload.chat_id, taskId: payload.task_id, contentLength: payload.content.length }, 'Sending message');
return this.request('POST', '/api/v1/messages', payload);
}
async listMessages(params: Record<string, string>): Promise<Record<string, unknown>[]> {
const qs = new URLSearchParams(params).toString();
return this.request('GET', `/api/v1/messages?${qs}`);
}
// --- Files ---
async uploadFile(taskId: string, filename: string, content: string): Promise<void> {
await this.request('POST', `/api/v1/tasks/${taskId}/files`, { filename, content });
}
async listTaskFiles(taskId: string): Promise<Record<string, unknown>[]> { async listTaskFiles(taskId: string): Promise<Record<string, unknown>[]> {
return this.request('GET', `/api/v1/tasks/${taskId}/files`); return this.request('GET', `/api/v1/tasks/${taskId}/files`);
} }
// --- Projects ---
async listProjects(): Promise<Record<string, unknown>[]> {
return this.request('GET', '/api/v1/projects');
}
async getProject(slug: string): Promise<Record<string, unknown>> {
return this.request('GET', `/api/v1/projects/${slug}`);
}
// --- Members ---
async listMembers(): Promise<Record<string, unknown>[]> {
return this.request('GET', '/api/v1/members');
}
} }

View File

@ -9,12 +9,15 @@ export type EventHandler = (event: TrackerEvent) => Promise<void>;
/** /**
* WebSocket client transport for connecting to the tracker. * WebSocket client transport for connecting to the tracker.
* *
* Instead of running an HTTP server and receiving events via POST, * The tracker WS handler supports two field conventions:
* the agent connects to the tracker over WebSocket. The bidirectional * - WEBSOCKET-PROTOCOL.md uses "event" field
* channel handles auth, events, heartbeat, and ack no open port needed. * - TRACKER-PROTOCOL.md uses "type" field
*
* We send BOTH fields in every message for compatibility.
* We read whichever is present on incoming messages.
*/ */
export class WsClientTransport implements TaskTracker { export class WsClientTransport implements TaskTracker {
private log = logger.child({ component: 'ws-client-transport' }); private log = logger.child({ component: 'ws' });
private ws: WebSocket | null = null; private ws: WebSocket | null = null;
private handler: EventHandler | null = null; private handler: EventHandler | null = null;
private currentTasks = new Set<string>(); private currentTasks = new Set<string>();
@ -27,6 +30,13 @@ export class WsClientTransport implements TaskTracker {
private rejectStart: ((err: Error) => void) | null = null; private rejectStart: ((err: Error) => void) | null = null;
private authenticated = false; private authenticated = false;
/** Lobby chat ID returned by auth.ok */
lobbyChatId: string | null = null;
/** Projects returned by auth.ok */
projects: Array<{ id: string; slug: string; name: string; chat_id?: string }> = [];
/** Online members from auth.ok */
online: string[] = [];
constructor(private config: AgentConfig) {} constructor(private config: AgentConfig) {}
onEvent(handler: EventHandler): void { onEvent(handler: EventHandler): void {
@ -68,47 +78,51 @@ export class WsClientTransport implements TaskTracker {
} }
private buildWsUrl(): string { private buildWsUrl(): string {
const base = this.config.trackerUrl if (this.config.wsUrl) return this.config.wsUrl;
const url = this.config.trackerUrl;
if (url.startsWith('ws://') || url.startsWith('wss://')) return url;
const base = url
.replace(/^http:\/\//, 'ws://') .replace(/^http:\/\//, 'ws://')
.replace(/^https:\/\//, 'wss://'); .replace(/^https:\/\//, 'wss://');
// Use the existing /ws endpoint with client_type=agent return `${base.replace(/\/$/, '')}/ws`;
return `${base.replace(/\/$/, '')}/ws?client_type=agent&client_id=${encodeURIComponent(this.config.slug)}`;
} }
private connect(): void { private connect(): void {
const url = this.buildWsUrl(); const url = this.buildWsUrl();
this.log.info({ url }, 'Connecting to tracker via WebSocket'); this.log.info('━━━ CONNECTING to %s ━━━', url);
const ws = new WebSocket(url); const ws = new WebSocket(url);
this.ws = ws; this.ws = ws;
ws.on('open', () => { ws.on('open', () => {
this.log.info('WebSocket connected, sending auth'); this.log.info('━━━ WS CONNECTED ━━━');
this.sendJson({ // Send auth — use BOTH "event" and "type" for compatibility
type: 'auth', this.send('auth', {
token: this.config.token, token: this.config.token,
agent: { name: this.config.name,
name: this.config.name, slug: this.config.slug,
slug: this.config.slug, capabilities: this.config.capabilities,
capabilities: this.config.capabilities,
max_concurrent_tasks: this.config.maxConcurrentTasks,
},
}); });
}); });
ws.on('message', (data) => { ws.on('message', (data) => {
const raw = data.toString();
this.log.info('← RAW: %s', raw.slice(0, 500));
let msg: Record<string, unknown>; let msg: Record<string, unknown>;
try { try {
msg = JSON.parse(data.toString()); msg = JSON.parse(raw);
} catch { } catch {
this.log.warn('Received non-JSON message, ignoring'); this.log.warn('Non-JSON message, ignoring');
return; return;
} }
this.handleMessage(msg); this.handleMessage(msg);
}); });
ws.on('close', (code, reason) => { ws.on('close', (code, reason) => {
this.log.info({ code, reason: reason.toString() }, 'WebSocket closed'); this.log.info('━━━ WS CLOSED (code=%d reason=%s) ━━━', code, reason.toString());
this.authenticated = false; this.authenticated = false;
if (this.heartbeatTimer) { if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer); clearInterval(this.heartbeatTimer);
@ -120,115 +134,184 @@ export class WsClientTransport implements TaskTracker {
}); });
ws.on('error', (err) => { ws.on('error', (err) => {
this.log.warn({ err: err.message }, 'WebSocket error'); this.log.error('━━━ WS ERROR: %s ━━━', err.message);
// 'close' event will follow, triggering reconnect
}); });
} }
private handleMessage(msg: Record<string, unknown>): void { /**
const type = msg.type as string; * Send a JSON message with both "event" and "type" fields set,
* so the tracker picks up whichever field it uses.
*/
private send(eventType: string, payload: Record<string, unknown> = {}): void {
const msg = { event: eventType, type: eventType, ...payload };
const json = JSON.stringify(msg);
this.log.info('→ SEND: %s', json.slice(0, 500));
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(json);
} else {
this.log.warn('WS not open, cannot send');
}
}
switch (type) { private handleMessage(msg: Record<string, unknown>): void {
// Read either "event" or "type" — tracker may use either
const msgType = (msg.event as string) || (msg.type as string) || '';
if (!msgType) {
this.log.warn('Message without event/type field: %s', JSON.stringify(msg).slice(0, 200));
return;
}
switch (msgType) {
case 'auth.ok':
case 'auth_ok': case 'auth_ok':
this.log.info('Authenticated with tracker'); this.onAuthOk(msg);
this.authenticated = true;
this.reconnectDelay = 1000; // Reset backoff on successful auth
this.startHeartbeat();
// Resolve the start() promise on first successful auth
if (this.resolveStart) {
this.resolveStart();
this.resolveStart = null;
this.rejectStart = null;
}
break; break;
case 'auth.error':
case 'auth_error': case 'auth_error':
this.log.error({ message: msg.message }, 'Authentication failed'); this.log.error('━━━ AUTH FAILED: %s ━━━', msg.message || msg.data);
if (this.rejectStart) { if (this.rejectStart) {
this.rejectStart(new Error(`Auth failed: ${msg.message}`)); this.rejectStart(new Error(`Auth failed: ${msg.message}`));
this.resolveStart = null; this.resolveStart = null;
this.rejectStart = null; this.rejectStart = null;
} }
// Close — don't reconnect on auth error from initial start
if (this.ws) { if (this.ws) {
this.ws.close(1000, 'Auth failed'); this.ws.close(1000, 'Auth failed');
this.ws = null; this.ws = null;
} }
break; break;
case 'event': // Heartbeat ack (tracker may send as any of these)
this.handleTrackerEvent(msg); case 'agent.heartbeat.ack':
case 'heartbeat.ack':
case 'heartbeat_ack':
this.log.info('← HEARTBEAT ACK');
break;
// Subscribe confirmations
case 'subscribe.ok':
case 'chat.subscribe.ok':
case 'project.subscribe.ok':
this.log.info('← SUBSCRIBE OK: %s', JSON.stringify(msg.data || {}).slice(0, 200));
break; break;
default: default:
this.log.debug({ type }, 'Unknown message type'); // Everything else is a tracker event — forward to handler
this.onTrackerEvent(msgType, msg);
break;
} }
} }
private handleTrackerEvent(msg: Record<string, unknown>): void { private onAuthOk(msg: Record<string, unknown>): void {
const event: TrackerEvent = { const data = (msg.data || msg.init || {}) as Record<string, unknown>;
event: msg.event as string, this.lobbyChatId = (data.lobby_chat_id as string) || null;
data: msg.data as Record<string, unknown>, this.projects = (data.projects as Array<{ id: string; slug: string; name: string; chat_id?: string }>) || [];
ts: msg.ts as number, this.online = (data.online as string[]) || (data.agents_online as string[]) || [];
id: msg.id as string,
};
if (!event.event || !event.id) { this.log.info('━━━ AUTH OK ━━━');
this.log.warn({ msg }, 'Invalid event: missing event or id'); this.log.info(' Lobby chat: %s', this.lobbyChatId || '(none)');
return; this.log.info(' Projects: %s', this.projects.map(p => `${p.slug}(${p.id})`).join(', ') || '(none)');
this.log.info(' Online: %s', this.online.join(', ') || '(nobody)');
this.log.info(' Full auth data: %s', JSON.stringify(data, null, 2));
this.authenticated = true;
this.reconnectDelay = 1000;
this.startHeartbeat();
// Subscribe to lobby chat
if (this.lobbyChatId) {
this.log.info('→ Subscribing to lobby chat: %s', this.lobbyChatId);
this.send('chat.subscribe', { chat_id: this.lobbyChatId });
} }
// Deduplication // Subscribe to projects (try both protocols)
if (this.processedIds.has(event.id)) { for (const project of this.projects) {
this.log.debug({ eventId: event.id }, 'Duplicate event, skipping'); this.log.info('→ Subscribing to project: %s (%s)', project.slug, project.id);
return; // WEBSOCKET-PROTOCOL.md style: subscribe with channels
} this.send('subscribe', { channels: [`project:${project.slug}`] });
this.processedIds.add(event.id); // TRACKER-PROTOCOL.md style: project.subscribe
this.send('project.subscribe', { project_id: project.id });
// Cap dedup set size // If project has a chat_id, subscribe to it
if (this.processedIds.size > 10000) { if (project.chat_id) {
const entries = [...this.processedIds]; this.log.info('→ Subscribing to project chat: %s', project.chat_id);
this.processedIds = new Set(entries.slice(entries.length - 5000)); this.send('chat.subscribe', { chat_id: project.chat_id });
}
} }
// Send ack if (this.resolveStart) {
this.sendJson({ type: 'ack', id: event.id }); this.resolveStart();
this.resolveStart = null;
this.rejectStart = null;
}
}
this.log.info({ eventType: event.event, eventId: event.id }, 'Received event'); private onTrackerEvent(eventType: string, msg: Record<string, unknown>): void {
const eventId = (msg.id as string) || `${eventType}-${Date.now()}`;
const data = (msg.data || msg) as Record<string, unknown>;
const ts = (msg.ts as number) || Date.now();
this.log.info('┌── EVENT: %s', eventType);
this.log.info('│ id: %s', eventId);
this.log.info('│ data: %s', JSON.stringify(data, null, 2));
const event: TrackerEvent = { event: eventType, data, ts, id: eventId };
// Dedup
if (msg.id) {
if (this.processedIds.has(eventId)) {
this.log.info('└── DUPLICATE, skipping');
return;
}
this.processedIds.add(eventId);
if (this.processedIds.size > 10000) {
const entries = [...this.processedIds];
this.processedIds = new Set(entries.slice(entries.length - 5000));
}
}
// Ack
this.send('ack', {});
this.log.info('└── → forwarding to router');
if (this.handler) { if (this.handler) {
this.handler(event).catch((err) => { this.handler(event).catch((err) => {
this.log.error({ err, eventId: event.id }, 'Event handler failed'); this.log.error({ err, eventId }, 'Event handler failed');
}); });
} }
} }
/** Send a chat message via WebSocket */
sendChatMessage(chatId: string, content: string, mentions: string[] = []): void {
this.send('chat.send', { chat_id: chatId, content, mentions });
}
/** Send a task comment via WebSocket */
sendTaskComment(taskId: string, content: string, mentions: string[] = []): void {
this.send('chat.send', { task_id: taskId, content, mentions });
}
private startHeartbeat(): void { private startHeartbeat(): void {
if (this.heartbeatTimer) clearInterval(this.heartbeatTimer); if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
const intervalMs = this.config.heartbeatIntervalSec * 1000; const intervalMs = this.config.heartbeatIntervalSec * 1000;
this.heartbeatTimer = setInterval(() => {
this.sendJson({ this.sendHeartbeat();
type: 'heartbeat', this.heartbeatTimer = setInterval(() => this.sendHeartbeat(), intervalMs);
status: this.currentTasks.size > 0 ? 'busy' : 'idle', this.log.info('Heartbeat started (every %ds)', this.config.heartbeatIntervalSec);
current_tasks: [...this.currentTasks], }
});
}, intervalMs); private sendHeartbeat(): void {
this.log.info({ intervalSec: this.config.heartbeatIntervalSec }, 'Heartbeat started'); const status = this.currentTasks.size > 0 ? 'busy' : 'online';
this.send('agent.heartbeat', { status, current_tasks: [...this.currentTasks] });
} }
private scheduleReconnect(): void { private scheduleReconnect(): void {
this.log.info({ delayMs: this.reconnectDelay }, 'Scheduling reconnect'); this.log.info('━━━ RECONNECT in %dms ━━━', this.reconnectDelay);
this.reconnectTimer = setTimeout(() => { this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null; this.reconnectTimer = null;
this.connect(); this.connect();
}, this.reconnectDelay); }, this.reconnectDelay);
// Exponential backoff: 1s → 2s → 4s → ... → 30s cap
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000); this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
} }
private sendJson(data: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
} }