picogent/src/transport/websocket.ts
2026-02-21 02:41:39 +03:00

129 lines
3.6 KiB
TypeScript

import { WebSocketServer, WebSocket } from 'ws';
import { logger } from '../logger.js';
import type { Transport, IncomingMessage, OutgoingMessage, MessageHandler } from './types.js';
const HEARTBEAT_INTERVAL = 30_000;
export class WebSocketTransport implements Transport {
private wss: WebSocketServer | null = null;
private handler: MessageHandler | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private clients = new Map<string, WebSocket>();
private clientCounter = 0;
constructor(private port: number) {}
onMessage(handler: MessageHandler): void {
this.handler = handler;
}
send(clientId: string, msg: OutgoingMessage): void {
const ws = this.clients.get(clientId);
if (!ws || ws.readyState !== WebSocket.OPEN) return;
ws.send(JSON.stringify(msg));
}
async start(): Promise<void> {
return new Promise((resolve) => {
this.wss = new WebSocketServer({ port: this.port }, () => {
logger.info({ port: this.port }, 'WebSocket server listening');
resolve();
});
this.wss.on('connection', (ws) => {
const clientId = `client-${++this.clientCounter}`;
this.clients.set(clientId, ws);
(ws as WebSocket & { isAlive: boolean }).isAlive = true;
logger.info({ clientId }, 'Client connected');
ws.on('pong', () => {
(ws as WebSocket & { isAlive: boolean }).isAlive = true;
});
ws.on('message', (data) => {
this.handleRawMessage(clientId, data.toString());
});
ws.on('close', () => {
this.clients.delete(clientId);
logger.info({ clientId }, 'Client disconnected');
});
ws.on('error', (err) => {
logger.error({ err, clientId }, 'WebSocket error');
this.clients.delete(clientId);
});
});
this.heartbeatTimer = setInterval(() => {
for (const [clientId, ws] of this.clients) {
const alive = ws as WebSocket & { isAlive: boolean };
if (!alive.isAlive) {
logger.warn({ clientId }, 'Client heartbeat timeout, terminating');
ws.terminate();
this.clients.delete(clientId);
continue;
}
alive.isAlive = false;
ws.ping();
}
}, HEARTBEAT_INTERVAL);
});
}
async stop(): Promise<void> {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
for (const ws of this.clients.values()) {
ws.close(1001, 'Server shutting down');
}
this.clients.clear();
return new Promise((resolve) => {
if (this.wss) {
this.wss.close(() => resolve());
} else {
resolve();
}
});
}
private handleRawMessage(clientId: string, raw: string): void {
let parsed: IncomingMessage & { _clientId?: string };
try {
parsed = JSON.parse(raw);
} catch {
this.send(clientId, {
id: 'error',
type: 'error',
content: 'Invalid JSON',
});
return;
}
if (!parsed.id || !parsed.content) {
this.send(clientId, {
id: parsed.id || 'error',
type: 'error',
content: 'Missing required fields: id, content',
});
return;
}
// Attach clientId so the handler knows where to send responses
parsed._clientId = clientId;
if (this.handler) {
this.handler(parsed).catch((err) => {
logger.error({ err, clientId, messageId: parsed.id }, 'Handler error');
this.send(clientId, {
id: parsed.id,
type: 'error',
content: err instanceof Error ? err.message : String(err),
});
});
}
}
}