diff --git a/src/index.ts b/src/index.ts index a19f39c..0be6619 100644 --- a/src/index.ts +++ b/src/index.ts @@ -40,6 +40,7 @@ async function startAgentWs(config: AgentConfig, client: TrackerClient): Promise const wsTransport = new WsClientTransport(config); const router = new EventRouter(config, client); + router.setWsTransport(wsTransport); wsTransport.onEvent((event) => router.handleEvent(event)); await wsTransport.start(); diff --git a/src/router.ts b/src/router.ts index 438c223..0a86ec0 100644 --- a/src/router.ts +++ b/src/router.ts @@ -6,6 +6,7 @@ import { createTrackerTools } from './tools/index.js'; import type { ToolDefinition } from '@mariozechner/pi-coding-agent'; import type { AgentConfig } from './config.js'; import type { TrackerEvent } from './tracker/types.js'; +import type { WsClientTransport } from './transport/ws-client.js'; export interface TaskTracker { addTask(taskId: string): void; @@ -16,6 +17,7 @@ export class EventRouter { private log = logger.child({ component: 'event-router' }); private trackerTools: ToolDefinition[]; private trackerClient: TrackerClient; + private wsTransport: WsClientTransport | null = null; constructor( private config: AgentConfig, @@ -30,6 +32,11 @@ export class EventRouter { this.log.info({ toolCount: this.trackerTools.length }, 'Tracker tools registered'); } + /** Set WS transport for streaming events */ + setWsTransport(transport: WsClientTransport): void { + this.wsTransport = transport; + } + async handleEvent(event: TrackerEvent): Promise { this.log.info('┌── ROUTER: handling %s (id: %s)', event.event, event.id); @@ -53,7 +60,7 @@ export class EventRouter { /** * message.new / chat.message — forward to agent session. - * Agent uses send_message tool to reply when needed. Router posts nothing. + * Streams deltas via WS, then sends final message via WS chat.send. */ private async handleMessageNew(data: Record): Promise { const content = (data.content as string) || ''; @@ -63,6 +70,10 @@ export class EventRouter { const chatId = data.chat_id as string | undefined; const taskKey = data.task_key as string | undefined; + // Extract author info from nested author object if present + const author = data.author as Record | undefined; + const resolvedAuthorSlug = authorSlug || (author?.slug as string) || ''; + if (!content) { this.log.warn({ data }, 'message.new event missing content'); return; @@ -70,18 +81,33 @@ export class EventRouter { // Build context-rich prompt for the agent const ctx = taskId ? `[задача ${taskKey || taskId}]` : chatId ? '[чат]' : ''; - const from = authorType === 'system' ? '[система]' : `@${authorSlug}`; + const from = authorType === 'system' ? '[система]' : `@${resolvedAuthorSlug}`; const prompt = `${ctx} ${from}: ${content}`; this.log.info('│ %s %s: "%s"', ctx, from, content.slice(0, 200)); - const result = await this.runAgent(prompt); + const target = chatId ? { chat_id: chatId } : taskId ? { task_id: taskId } : null; - // Auto-reply: if agent produced text but didn't call send_message, send it automatically - if (result.text && !result.usedSendMessage) { - const target = chatId ? { chat_id: chatId } : taskId ? { task_id: taskId } : null; - if (target) { - this.log.info('│ Auto-sending agent reply (%d chars)', result.text.length); + // Stream start + if (this.wsTransport && target) { + this.wsTransport.sendStreamEvent('agent.stream.start', { ...target }); + } + + const result = await this.runAgent(prompt, target); + + // Auto-reply via WS: if agent produced text but didn't call send_message + if (result.text && !result.usedSendMessage && target) { + this.log.info('│ Auto-sending agent reply via WS (%d chars)', result.text.length); + if (this.wsTransport) { + this.wsTransport.sendChatMessage( + target.chat_id || '', + result.text, + [], + result.thinking || undefined, + target.task_id, + ); + } else { + // Fallback to REST if no WS transport try { await this.trackerClient.sendMessage({ ...target, content: result.text }); } catch (err) { @@ -90,15 +116,24 @@ export class EventRouter { } } + // Stream end + if (this.wsTransport && target) { + this.wsTransport.sendStreamEvent('agent.stream.end', { ...target }); + } + this.log.info('└── MESSAGE handled'); } /** - * Run agent session. Agent controls everything via tools (send_message, update_task, etc.) - * Returns collected text and whether send_message was used. + * Run agent session with streaming support. + * Streams text deltas and tool calls via WS transport. */ - private async runAgent(prompt: string): Promise<{ text: string; usedSendMessage: boolean }> { + private async runAgent( + prompt: string, + target: { chat_id?: string; task_id?: string } | null, + ): Promise<{ text: string; thinking: string; usedSendMessage: boolean }> { let text = ''; + let thinking = ''; let usedSendMessage = false; for await (const msg of runAgent(prompt, { @@ -106,7 +141,7 @@ export class EventRouter { sessionId: this.config.sessionId, model: this.config.model, provider: this.config.provider, - systemPrompt: this.config.prompt || undefined, // fallback if no AGENT.md + systemPrompt: this.config.prompt || undefined, skillsDir: this.config.agentHome, sessionDir: path.join(this.config.agentHome, 'sessions'), allowedPaths: this.config.allowedPaths.length > 0 @@ -118,16 +153,56 @@ export class EventRouter { if (msg.type === 'error') { this.log.error({ error: msg.content }, 'Agent error'); } - // Collect assistant text + + // Stream text deltas via WS if (msg.type === 'text') { text += msg.content || ''; + if (this.wsTransport && target) { + this.wsTransport.sendStreamEvent('agent.stream.delta', { + ...target, + block_type: 'text', + text: msg.content || '', + }); + } } - // Track if send_message tool was called - if (msg.type === 'tool_use' && msg.content?.startsWith('send_message')) { - usedSendMessage = true; + + // Collect thinking (if Pi Agent supports it) + if (msg.type === 'thinking' as string) { + thinking += msg.content || ''; + if (this.wsTransport && target) { + this.wsTransport.sendStreamEvent('agent.stream.delta', { + ...target, + block_type: 'thinking', + text: msg.content || '', + }); + } + } + + // Stream tool calls + if (msg.type === 'tool_use') { + if (msg.content?.startsWith('send_message')) { + usedSendMessage = true; + } + if (this.wsTransport && target) { + this.wsTransport.sendStreamEvent('agent.stream.tool', { + ...target, + tool: msg.content || '', + status: 'running', + }); + } + } + + if (msg.type === 'tool_result') { + if (this.wsTransport && target) { + this.wsTransport.sendStreamEvent('agent.stream.tool', { + ...target, + tool: msg.content || '', + status: 'done', + }); + } } } - return { text: text.trim(), usedSendMessage }; + return { text: text.trim(), thinking: thinking.trim(), usedSendMessage }; } } diff --git a/src/transport/ws-client.ts b/src/transport/ws-client.ts index ae8c11c..314e388 100644 --- a/src/transport/ws-client.ts +++ b/src/transport/ws-client.ts @@ -262,14 +262,23 @@ export class WsClientTransport implements TaskTracker { } } - /** Send a chat message via WebSocket */ - sendChatMessage(chatId: string, content: string, mentions: string[] = []): void { - this.send('chat.send', { chat_id: chatId, content, mentions }); + /** Send a chat message via WebSocket (with optional thinking) */ + sendChatMessage(chatId: string, content: string, mentions: string[] = [], thinking?: string, taskId?: string): void { + const payload: Record = { content, mentions }; + if (chatId) payload.chat_id = chatId; + if (taskId) payload.task_id = taskId; + if (thinking) payload.thinking = thinking; + this.send('chat.send', payload); } /** Send a task comment via WebSocket */ - sendTaskComment(taskId: string, content: string, mentions: string[] = []): void { - this.send('chat.send', { task_id: taskId, content, mentions }); + sendTaskComment(taskId: string, content: string, mentions: string[] = [], thinking?: string): void { + this.send('chat.send', { task_id: taskId, content, mentions, ...(thinking ? { thinking } : {}) }); + } + + /** Send agent stream event via WebSocket */ + sendStreamEvent(eventType: string, data: Record): void { + this.send(eventType, data); } private startHeartbeat(): void {