From c2c595224c28552dc722d14553e207afc961dd99 Mon Sep 17 00:00:00 2001 From: markov Date: Fri, 27 Feb 2026 06:55:36 +0100 Subject: [PATCH] Phase 1: agent streaming WS events + thinking field in messages - New WS events: agent.stream.start/delta/tool/end - Tracker relays agent stream events to project subscribers - Message model: added 'thinking' column (nullable text) - MessageCreate, MessageOut, converters: thinking support - chat.send WS handler: accepts thinking field - broadcast_message: custom event_type parameter --- src/tracker/api/converters.py | 1 + src/tracker/api/messages.py | 2 ++ src/tracker/api/schemas.py | 1 + src/tracker/enums.py | 4 +++ src/tracker/models/chat.py | 1 + src/tracker/ws/handler.py | 54 +++++++++++++++++++++++++++++++++++ src/tracker/ws/manager.py | 6 ++-- 7 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/tracker/api/converters.py b/src/tracker/api/converters.py index 96cece3..b718039 100644 --- a/src/tracker/api/converters.py +++ b/src/tracker/api/converters.py @@ -74,6 +74,7 @@ def message_out(m: Message) -> MessageOut: author_id=str(m.author_id) if m.author_id else None, author=member_brief(m.author) if m.author else None, content=m.content, + thinking=m.thinking, mentions=m.mentions or [], voice_url=m.voice_url, attachments=[attachment_out(a) for a in (m.attachments or [])], diff --git a/src/tracker/api/messages.py b/src/tracker/api/messages.py index b527044..2209041 100644 --- a/src/tracker/api/messages.py +++ b/src/tracker/api/messages.py @@ -37,6 +37,7 @@ class MessageCreate(BaseModel): task_id: str | None = None parent_id: str | None = None content: str + thinking: str | None = None mentions: list[str] = [] voice_url: str | None = None attachments: list[AttachmentInput] = [] @@ -93,6 +94,7 @@ async def create_message(req: MessageCreate, request: Request, db: AsyncSession author_type=author_type, author_id=author_id, content=req.content, + thinking=req.thinking, mentions=req.mentions, voice_url=req.voice_url, ) diff --git a/src/tracker/api/schemas.py b/src/tracker/api/schemas.py index 47a116a..ae5714f 100644 --- a/src/tracker/api/schemas.py +++ b/src/tracker/api/schemas.py @@ -60,6 +60,7 @@ class MessageOut(BaseModel): author_id: str | None = None author: MemberBrief | None = None content: str + thinking: str | None = None mentions: list[str] = [] voice_url: str | None = None attachments: list[AttachmentOut] = [] diff --git a/src/tracker/enums.py b/src/tracker/enums.py index f9ed9bb..c452967 100644 --- a/src/tracker/enums.py +++ b/src/tracker/enums.py @@ -99,3 +99,7 @@ class WSEventType(StrEnum): TASK_ASSIGNED = "task.assigned" TASK_DELETED = "task.deleted" AGENT_STATUS = "agent.status" + AGENT_STREAM_START = "agent.stream.start" + AGENT_STREAM_DELTA = "agent.stream.delta" + AGENT_STREAM_TOOL = "agent.stream.tool" + AGENT_STREAM_END = "agent.stream.end" diff --git a/src/tracker/models/chat.py b/src/tracker/models/chat.py index d29efbf..3e020ea 100644 --- a/src/tracker/models/chat.py +++ b/src/tracker/models/chat.py @@ -46,6 +46,7 @@ class Message(Base): # Content content: Mapped[str] = mapped_column(Text, nullable=False) + thinking: Mapped[str | None] = mapped_column(Text, nullable=True) # LLM reasoning/thinking block mentions: Mapped[list[str]] = mapped_column(ARRAY(String), default=list) voice_url: Mapped[str | None] = mapped_column(String(500)) diff --git a/src/tracker/ws/handler.py b/src/tracker/ws/handler.py index 0ef8a6e..c46cf5a 100644 --- a/src/tracker/ws/handler.py +++ b/src/tracker/ws/handler.py @@ -40,6 +40,7 @@ def _to_message_out(msg: Message, author: Member | None = None) -> MessageOut: author_id=str(msg.author_id) if msg.author_id else None, author=_to_member_brief(author) if author else None, content=msg.content, + thinking=msg.thinking, mentions=msg.mentions or [], voice_url=msg.voice_url, attachments=[], # WS broadcasts don't include full attachments @@ -95,6 +96,14 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""): elif msg_type == WSEventType.PROJECT_UNSUBSCRIBE: await _handle_unsubscribe(session_id, data) + elif msg_type in ( + WSEventType.AGENT_STREAM_START, + WSEventType.AGENT_STREAM_DELTA, + WSEventType.AGENT_STREAM_TOOL, + WSEventType.AGENT_STREAM_END, + ): + await _handle_agent_stream(session_id, msg_type, data) + else: await ws.send_json({"type": WSEventType.ERROR, "message": f"Unknown type: {msg_type}"}) @@ -289,6 +298,7 @@ async def _handle_chat_send(session_id: str, data: dict): chat_id = data.get("chat_id") task_id = data.get("task_id") content = data.get("content", "") + thinking = data.get("thinking") mentions = data.get("mentions", []) if not content: @@ -307,6 +317,7 @@ async def _handle_chat_send(session_id: str, data: dict): author_type=member.type, author_id=member.id, content=content, + thinking=thinking, mentions=mentions, ) db.add(msg) @@ -338,6 +349,49 @@ async def _handle_chat_send(session_id: str, data: dict): ) +async def _handle_agent_stream(session_id: str, event_type: str, data: dict): + """Relay agent streaming events to project subscribers.""" + client = manager.sessions.get(session_id) + if not client: + return + + # Only agents can stream + if client.member_type != MemberType.AGENT: + return + + # Determine project_id from chat_id or task_id in the data + project_id = data.get("project_id") + chat_id = data.get("chat_id") + task_id = data.get("task_id") + + if not project_id and (chat_id or task_id): + async with async_session() as db: + if chat_id: + chat_result = await db.execute(select(Chat).where(Chat.id == uuid.UUID(chat_id))) + chat = chat_result.scalar_one_or_none() + if chat and chat.project_id: + project_id = str(chat.project_id) + elif task_id: + from ..models import Task + task_result = await db.execute(select(Task).where(Task.id == uuid.UUID(task_id))) + task = task_result.scalar_one_or_none() + if task: + project_id = str(task.project_id) + + payload = { + "type": event_type, + "data": { + **data, + "agent_slug": client.member_slug, + }, + } + + if project_id: + await manager.broadcast_message(project_id, payload["data"], author_slug=client.member_slug, author_session_id=session_id, event_type=event_type) + else: + await manager.broadcast_all(payload, exclude_slug=client.member_slug) + + async def _handle_subscribe(session_id: str, data: dict): """Subscribe this session to project events.""" project_id = data.get("project_id") diff --git a/src/tracker/ws/manager.py b/src/tracker/ws/manager.py index 69364e7..9ebbd2e 100644 --- a/src/tracker/ws/manager.py +++ b/src/tracker/ws/manager.py @@ -79,8 +79,8 @@ class ConnectionManager: except Exception: await self.disconnect(client.session_id) - async def broadcast_message(self, project_id: str, message: dict, author_slug: str, author_session_id: str | None = None): - """Broadcast message.new. Humans get everything, agents filtered. + async def broadcast_message(self, project_id: str, message: dict, author_slug: str, author_session_id: str | None = None, event_type: str | None = None): + """Broadcast message/stream event. Humans get everything, agents filtered. Filtering for agents: - Skip author's session (by session_id if available, else by slug) @@ -92,7 +92,7 @@ class ConnectionManager: mentions = message.get("mentions", []) content = message.get("content", "") author_type = message.get("author_type", "") - payload = {"type": WSEventType.MESSAGE_NEW, "data": message} + payload = {"type": event_type or WSEventType.MESSAGE_NEW, "data": message} for session_id, client in list(self.sessions.items()): # Skip only the sending session, not all sessions of the same user