picogent/src/transport/ws-client.ts
Markov 6dcae9a320 Phase 1: streaming via WS + thinking support
- Router streams text deltas via WS (agent.stream.delta)
- Router streams tool calls (agent.stream.tool)
- Auto-reply via WS chat.send (not REST)
- Thinking blocks collected and sent with final message
- WsClientTransport: sendStreamEvent + sendChatMessage with thinking
- Router receives WS transport reference via setWsTransport()
2026-02-27 06:56:44 +01:00

307 lines
9.7 KiB
TypeScript

import WebSocket from 'ws';
import { logger } from '../logger.js';
import type { AgentConfig } from '../config.js';
import type { TaskTracker } from '../router.js';
import type { TrackerEvent } from '../tracker/types.js';
export type EventHandler = (event: TrackerEvent) => Promise<void>;
/**
* WebSocket client transport for connecting to the tracker.
*
* The tracker WS handler supports two field conventions:
* - WEBSOCKET-PROTOCOL.md uses "event" field
* - TRACKER-PROTOCOL.md uses "type" field
*
* We send BOTH fields in every message for compatibility.
* We read whichever is present on incoming messages.
*/
export class WsClientTransport implements TaskTracker {
private log = logger.child({ component: 'ws' });
private ws: WebSocket | null = null;
private handler: EventHandler | null = null;
private currentTasks = new Set<string>();
private processedIds = new Set<string>();
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private reconnectDelay = 1000;
private stopped = false;
private resolveStart: (() => void) | null = null;
private rejectStart: ((err: Error) => void) | null = null;
private authenticated = false;
/** Lobby chat ID returned by auth.ok */
lobbyChatId: string | null = null;
/** Projects returned by auth.ok */
projects: Array<{ id: string; slug: string; name: string; chat_id?: string }> = [];
/** Online members from auth.ok */
online: string[] = [];
constructor(private config: AgentConfig) {}
onEvent(handler: EventHandler): void {
this.handler = handler;
}
addTask(taskId: string): void {
this.currentTasks.add(taskId);
}
removeTask(taskId: string): void {
this.currentTasks.delete(taskId);
}
async start(): Promise<void> {
this.stopped = false;
return new Promise<void>((resolve, reject) => {
this.resolveStart = resolve;
this.rejectStart = reject;
this.connect();
});
}
async stop(): Promise<void> {
this.stopped = true;
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ws) {
this.ws.close(1000, 'Agent shutting down');
this.ws = null;
}
this.log.info('WS client transport stopped');
}
private buildWsUrl(): string {
if (this.config.wsUrl) return this.config.wsUrl;
const url = this.config.trackerUrl;
if (url.startsWith('ws://') || url.startsWith('wss://')) return url;
const base = url
.replace(/^http:\/\//, 'ws://')
.replace(/^https:\/\//, 'wss://');
return `${base.replace(/\/$/, '')}/ws`;
}
private connect(): void {
const url = this.buildWsUrl();
this.log.info('━━━ CONNECTING to %s ━━━', url);
const ws = new WebSocket(url);
this.ws = ws;
ws.on('open', () => {
this.log.info('━━━ WS CONNECTED ━━━');
// Send auth — Tracker expects "type" field only
this.send('auth', {
token: this.config.token,
});
});
ws.on('message', (data) => {
const raw = data.toString();
this.log.info('← RAW: %s', raw.slice(0, 500));
let msg: Record<string, unknown>;
try {
msg = JSON.parse(raw);
} catch {
this.log.warn('Non-JSON message, ignoring');
return;
}
this.handleMessage(msg);
});
ws.on('close', (code, reason) => {
this.log.info('━━━ WS CLOSED (code=%d reason=%s) ━━━', code, reason.toString());
this.authenticated = false;
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (!this.stopped) {
this.scheduleReconnect();
}
});
ws.on('error', (err) => {
this.log.error('━━━ WS ERROR: %s ━━━', err.message);
});
}
/**
* Send a JSON message with "type" field (Tracker protocol).
*/
private send(eventType: string, payload: Record<string, unknown> = {}): void {
const msg = { type: eventType, ...payload };
const json = JSON.stringify(msg);
this.log.info('→ SEND: %s', json.slice(0, 500));
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(json);
} else {
this.log.warn('WS not open, cannot send');
}
}
private handleMessage(msg: Record<string, unknown>): void {
// Read either "event" or "type" — tracker may use either
const msgType = (msg.event as string) || (msg.type as string) || '';
if (!msgType) {
this.log.warn('Message without event/type field: %s', JSON.stringify(msg).slice(0, 200));
return;
}
switch (msgType) {
case 'auth.ok':
case 'auth_ok':
this.onAuthOk(msg);
break;
case 'auth.error':
case 'auth_error':
this.log.error('━━━ AUTH FAILED: %s ━━━', msg.message || msg.data);
if (this.rejectStart) {
this.rejectStart(new Error(`Auth failed: ${msg.message}`));
this.resolveStart = null;
this.rejectStart = null;
}
if (this.ws) {
this.ws.close(1000, 'Auth failed');
this.ws = null;
}
break;
// Heartbeat ack (tracker may send as any of these)
case 'agent.heartbeat.ack':
case 'heartbeat.ack':
case 'heartbeat_ack':
this.log.info('← HEARTBEAT ACK');
break;
// Subscribe confirmations
case 'subscribe.ok':
case 'chat.subscribe.ok':
case 'project.subscribe.ok':
this.log.info('← SUBSCRIBE OK: %s', JSON.stringify(msg.data || {}).slice(0, 200));
break;
default:
// Everything else is a tracker event — forward to handler
this.onTrackerEvent(msgType, msg);
break;
}
}
private onAuthOk(msg: Record<string, unknown>): void {
const data = (msg.data || msg.init || {}) as Record<string, unknown>;
this.lobbyChatId = (data.lobby_chat_id as string) || null;
this.projects = (data.projects as Array<{ id: string; slug: string; name: string; chat_id?: string }>) || [];
this.online = (data.online as string[]) || (data.agents_online as string[]) || [];
this.log.info('━━━ AUTH OK ━━━');
this.log.info(' Lobby chat: %s', this.lobbyChatId || '(none)');
this.log.info(' Projects: %s', this.projects.map(p => `${p.slug}(${p.id})`).join(', ') || '(none)');
this.log.info(' Online: %s', this.online.join(', ') || '(nobody)');
this.log.info(' Full auth data: %s', JSON.stringify(data, null, 2));
this.authenticated = true;
this.reconnectDelay = 1000;
this.startHeartbeat();
// Projects are auto-subscribed by Tracker on auth
this.log.info('Auto-subscribed to %d projects: %s', this.projects.length,
this.projects.map(p => p.slug).join(', ') || '(none)');
if (this.resolveStart) {
this.resolveStart();
this.resolveStart = null;
this.rejectStart = null;
}
}
private onTrackerEvent(eventType: string, msg: Record<string, unknown>): void {
const eventId = (msg.id as string) || `${eventType}-${Date.now()}`;
const data = (msg.data || msg) as Record<string, unknown>;
const ts = (msg.ts as number) || Date.now();
this.log.info('┌── EVENT: %s', eventType);
this.log.info('│ id: %s', eventId);
this.log.info('│ data: %s', JSON.stringify(data, null, 2));
const event: TrackerEvent = { event: eventType, data, ts, id: eventId };
// Dedup
if (msg.id) {
if (this.processedIds.has(eventId)) {
this.log.info('└── DUPLICATE, skipping');
return;
}
this.processedIds.add(eventId);
if (this.processedIds.size > 10000) {
const entries = [...this.processedIds];
this.processedIds = new Set(entries.slice(entries.length - 5000));
}
}
// Ack
this.send('ack', {});
this.log.info('└── → forwarding to router');
if (this.handler) {
this.handler(event).catch((err) => {
this.log.error({ err, eventId }, 'Event handler failed');
});
}
}
/** Send a chat message via WebSocket (with optional thinking) */
sendChatMessage(chatId: string, content: string, mentions: string[] = [], thinking?: string, taskId?: string): void {
const payload: Record<string, unknown> = { content, mentions };
if (chatId) payload.chat_id = chatId;
if (taskId) payload.task_id = taskId;
if (thinking) payload.thinking = thinking;
this.send('chat.send', payload);
}
/** Send a task comment via WebSocket */
sendTaskComment(taskId: string, content: string, mentions: string[] = [], thinking?: string): void {
this.send('chat.send', { task_id: taskId, content, mentions, ...(thinking ? { thinking } : {}) });
}
/** Send agent stream event via WebSocket */
sendStreamEvent(eventType: string, data: Record<string, unknown>): void {
this.send(eventType, data);
}
private startHeartbeat(): void {
if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
const intervalMs = this.config.heartbeatIntervalSec * 1000;
this.sendHeartbeat();
this.heartbeatTimer = setInterval(() => this.sendHeartbeat(), intervalMs);
this.log.info('Heartbeat started (every %ds)', this.config.heartbeatIntervalSec);
}
private sendHeartbeat(): void {
const status = this.currentTasks.size > 0 ? 'busy' : 'online';
this.send('heartbeat', { status });
}
private scheduleReconnect(): void {
this.log.info('━━━ RECONNECT in %dms ━━━', this.reconnectDelay);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
}
}