129 lines
3.6 KiB
TypeScript
129 lines
3.6 KiB
TypeScript
import { WebSocketServer, WebSocket } from 'ws';
|
|
import { logger } from '../logger.js';
|
|
import type { Transport, IncomingMessage, OutgoingMessage, MessageHandler } from './types.js';
|
|
|
|
const HEARTBEAT_INTERVAL = 30_000;
|
|
|
|
export class WebSocketTransport implements Transport {
|
|
private wss: WebSocketServer | null = null;
|
|
private handler: MessageHandler | null = null;
|
|
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
|
|
private clients = new Map<string, WebSocket>();
|
|
private clientCounter = 0;
|
|
|
|
constructor(private port: number) {}
|
|
|
|
onMessage(handler: MessageHandler): void {
|
|
this.handler = handler;
|
|
}
|
|
|
|
send(clientId: string, msg: OutgoingMessage): void {
|
|
const ws = this.clients.get(clientId);
|
|
if (!ws || ws.readyState !== WebSocket.OPEN) return;
|
|
ws.send(JSON.stringify(msg));
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
return new Promise((resolve) => {
|
|
this.wss = new WebSocketServer({ port: this.port }, () => {
|
|
logger.info({ port: this.port }, 'WebSocket server listening');
|
|
resolve();
|
|
});
|
|
|
|
this.wss.on('connection', (ws) => {
|
|
const clientId = `client-${++this.clientCounter}`;
|
|
this.clients.set(clientId, ws);
|
|
(ws as WebSocket & { isAlive: boolean }).isAlive = true;
|
|
logger.info({ clientId }, 'Client connected');
|
|
|
|
ws.on('pong', () => {
|
|
(ws as WebSocket & { isAlive: boolean }).isAlive = true;
|
|
});
|
|
|
|
ws.on('message', (data) => {
|
|
this.handleRawMessage(clientId, data.toString());
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
this.clients.delete(clientId);
|
|
logger.info({ clientId }, 'Client disconnected');
|
|
});
|
|
|
|
ws.on('error', (err) => {
|
|
logger.error({ err, clientId }, 'WebSocket error');
|
|
this.clients.delete(clientId);
|
|
});
|
|
});
|
|
|
|
this.heartbeatTimer = setInterval(() => {
|
|
for (const [clientId, ws] of this.clients) {
|
|
const alive = ws as WebSocket & { isAlive: boolean };
|
|
if (!alive.isAlive) {
|
|
logger.warn({ clientId }, 'Client heartbeat timeout, terminating');
|
|
ws.terminate();
|
|
this.clients.delete(clientId);
|
|
continue;
|
|
}
|
|
alive.isAlive = false;
|
|
ws.ping();
|
|
}
|
|
}, HEARTBEAT_INTERVAL);
|
|
});
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
if (this.heartbeatTimer) {
|
|
clearInterval(this.heartbeatTimer);
|
|
this.heartbeatTimer = null;
|
|
}
|
|
for (const ws of this.clients.values()) {
|
|
ws.close(1001, 'Server shutting down');
|
|
}
|
|
this.clients.clear();
|
|
return new Promise((resolve) => {
|
|
if (this.wss) {
|
|
this.wss.close(() => resolve());
|
|
} else {
|
|
resolve();
|
|
}
|
|
});
|
|
}
|
|
|
|
private handleRawMessage(clientId: string, raw: string): void {
|
|
let parsed: IncomingMessage & { _clientId?: string };
|
|
try {
|
|
parsed = JSON.parse(raw);
|
|
} catch {
|
|
this.send(clientId, {
|
|
id: 'error',
|
|
type: 'error',
|
|
content: 'Invalid JSON',
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (!parsed.id || !parsed.content) {
|
|
this.send(clientId, {
|
|
id: parsed.id || 'error',
|
|
type: 'error',
|
|
content: 'Missing required fields: id, content',
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Attach clientId so the handler knows where to send responses
|
|
parsed._clientId = clientId;
|
|
|
|
if (this.handler) {
|
|
this.handler(parsed).catch((err) => {
|
|
logger.error({ err, clientId, messageId: parsed.id }, 'Handler error');
|
|
this.send(clientId, {
|
|
id: parsed.id,
|
|
type: 'error',
|
|
content: err instanceof Error ? err.message : String(err),
|
|
});
|
|
});
|
|
}
|
|
}
|
|
}
|