import path from 'path'; import { logger } from './logger.js'; import { runAgent } from './agent.js'; import { TrackerClient } from './tracker/client.js'; import { createTrackerTools } from './tools/index.js'; import type { ToolDefinition } from '@mariozechner/pi-coding-agent'; import type { AgentConfig } from './config.js'; import type { TrackerEvent } from './tracker/types.js'; import type { WsClientTransport } from './transport/ws-client.js'; export interface TaskTracker { addTask(taskId: string): void; removeTask(taskId: string): void; } export class EventRouter { private log = logger.child({ component: 'event-router' }); private trackerTools: ToolDefinition[]; private trackerClient: TrackerClient; private wsTransport: WsClientTransport | null = null; constructor( private config: AgentConfig, private client: TrackerClient, ) { this.trackerClient = client; this.trackerTools = createTrackerTools({ trackerClient: client, agentSlug: config.slug, selfAssignedTasks: new Set(), }); this.log.info({ toolCount: this.trackerTools.length }, 'Tracker tools registered'); } /** Set WS transport for streaming events */ setWsTransport(transport: WsClientTransport): void { this.wsTransport = transport; } async handleEvent(event: TrackerEvent): Promise { this.log.info('┌── ROUTER: handling %s (id: %s)', event.event, event.id); switch (event.event) { case 'message.new': case 'chat.message': await this.handleMessageNew(event.data); break; case 'task.assigned': case 'task.created': case 'task.updated': case 'agent.status': case 'agent.online': case 'agent.offline': this.log.info({ event: event.event }, 'Informational event, skipping'); break; default: this.log.warn({ event: event.event }, 'Unknown event type, ignoring'); } } /** * message.new / chat.message — forward to agent session. * Streams deltas via WS, then sends final message via WS chat.send. */ private async handleMessageNew(data: Record): Promise { const content = (data.content as string) || ''; const authorSlug = (data.author_slug as string) || (data.sender_slug as string) || ''; const authorType = (data.author_type as string) || 'member'; const taskId = data.task_id as string | undefined; const chatId = data.chat_id as string | undefined; const taskKey = data.task_key as string | undefined; // Extract author info from nested author object if present const author = data.author as Record | undefined; const resolvedAuthorSlug = authorSlug || (author?.slug as string) || ''; if (!content) { this.log.warn({ data }, 'message.new event missing content'); return; } // Ignore own messages (agent talking to itself) if (resolvedAuthorSlug === this.config.slug) { this.log.info('│ Skipping own message'); return; } // System messages: only process if agent is explicitly mentioned (@slug) // This lets assignment/mention notifications through, but ignores generic lifecycle events if (authorType === 'system') { if (!content.includes(`@${this.config.slug}`)) { this.log.info('│ Skipping system message (not mentioned): "%s"', content.slice(0, 100)); return; } this.log.info('│ Processing system message (mentioned): "%s"', content.slice(0, 100)); } // Build context-rich prompt for the agent const ctx = taskId ? `[задача ${taskKey || taskId}]` : chatId ? '[чат]' : ''; const from = authorType === 'system' ? '[система]' : `@${resolvedAuthorSlug}`; const prompt = `${ctx} ${from}: ${content}`; this.log.info('│ %s %s: "%s"', ctx, from, content.slice(0, 200)); const target = chatId ? { chat_id: chatId } : taskId ? { task_id: taskId } : null; // Stream start if (this.wsTransport && target) { this.wsTransport.sendStreamEvent('agent.stream.start', { ...target }); } const result = await this.runAgent(prompt, target); // Auto-reply via WS: if agent produced text but didn't call send_message if (result.text && !result.usedSendMessage && target) { this.log.info('│ Auto-sending agent reply via WS (%d chars)', result.text.length); if (this.wsTransport) { this.wsTransport.sendChatMessage( target.chat_id || '', result.text, [], result.thinking || undefined, target.task_id, result.toolLog.length > 0 ? result.toolLog : undefined, ); } else { // Fallback to REST if no WS transport try { await this.trackerClient.sendMessage({ ...target, content: result.text }); } catch (err) { this.log.error({ err }, 'Failed to auto-send agent reply'); } } } // Stream end if (this.wsTransport && target) { this.wsTransport.sendStreamEvent('agent.stream.end', { ...target }); } this.log.info('└── MESSAGE handled'); } /** * Run agent session with streaming support. * Streams text deltas and tool calls via WS transport. */ private async runAgent( prompt: string, target: { chat_id?: string; task_id?: string } | null, ): Promise<{ text: string; thinking: string; toolLog: Array<{name: string; args?: string; result?: string; error?: boolean}>; usedSendMessage: boolean }> { let text = ''; let thinking = ''; let usedSendMessage = false; const toolLog: Array<{name: string; args?: string; result?: string; error?: boolean}> = []; let currentToolName = ''; for await (const msg of runAgent(prompt, { workDir: this.config.workDir, sessionId: this.config.sessionId, model: this.config.model, provider: this.config.provider, systemPrompt: this.config.prompt || undefined, skillsDir: this.config.agentHome, sessionDir: path.join(this.config.agentHome, 'sessions'), allowedPaths: this.config.allowedPaths.length > 0 ? this.config.allowedPaths : [this.config.agentHome], customTools: this.trackerTools, agentHome: this.config.agentHome, })) { if (msg.type === 'error') { this.log.error({ error: msg.content }, 'Agent error'); } // Stream text deltas via WS if (msg.type === 'text') { text += msg.content || ''; if (this.wsTransport && target) { this.wsTransport.sendStreamEvent('agent.stream.delta', { ...target, block_type: 'text', text: msg.content || '', }); } } // Collect thinking (if Pi Agent supports it) if (msg.type === 'thinking' as string) { thinking += msg.content || ''; if (this.wsTransport && target) { this.wsTransport.sendStreamEvent('agent.stream.delta', { ...target, block_type: 'thinking', text: msg.content || '', }); } } // Stream tool calls and collect for tool_log if (msg.type === 'tool_use') { currentToolName = msg.content || ''; if (currentToolName.startsWith('send_message')) { usedSendMessage = true; } toolLog.push({ name: currentToolName }); if (this.wsTransport && target) { this.wsTransport.sendStreamEvent('agent.stream.tool', { ...target, tool: currentToolName, status: 'running', }); } } if (msg.type === 'tool_result') { // Update last tool log entry with result if (toolLog.length > 0) { const last = toolLog[toolLog.length - 1]; last.result = (msg.content || '').slice(0, 500); // truncate long results } if (this.wsTransport && target) { this.wsTransport.sendStreamEvent('agent.stream.tool', { ...target, tool: currentToolName, status: 'done', }); } } } return { text: text.trim(), thinking: thinking.trim(), toolLog, usedSendMessage }; } }