Phase 1: agent streaming WS events + thinking field in messages
Some checks failed
Deploy Tracker / deploy (push) Failing after 4s

- New WS events: agent.stream.start/delta/tool/end
- Tracker relays agent stream events to project subscribers
- Message model: added 'thinking' column (nullable text)
- MessageCreate, MessageOut, converters: thinking support
- chat.send WS handler: accepts thinking field
- broadcast_message: custom event_type parameter
This commit is contained in:
markov 2026-02-27 06:55:36 +01:00
parent 0c44a8b384
commit c2c595224c
7 changed files with 66 additions and 3 deletions

View File

@ -74,6 +74,7 @@ def message_out(m: Message) -> MessageOut:
author_id=str(m.author_id) if m.author_id else None,
author=member_brief(m.author) if m.author else None,
content=m.content,
thinking=m.thinking,
mentions=m.mentions or [],
voice_url=m.voice_url,
attachments=[attachment_out(a) for a in (m.attachments or [])],

View File

@ -37,6 +37,7 @@ class MessageCreate(BaseModel):
task_id: str | None = None
parent_id: str | None = None
content: str
thinking: str | None = None
mentions: list[str] = []
voice_url: str | None = None
attachments: list[AttachmentInput] = []
@ -93,6 +94,7 @@ async def create_message(req: MessageCreate, request: Request, db: AsyncSession
author_type=author_type,
author_id=author_id,
content=req.content,
thinking=req.thinking,
mentions=req.mentions,
voice_url=req.voice_url,
)

View File

@ -60,6 +60,7 @@ class MessageOut(BaseModel):
author_id: str | None = None
author: MemberBrief | None = None
content: str
thinking: str | None = None
mentions: list[str] = []
voice_url: str | None = None
attachments: list[AttachmentOut] = []

View File

@ -99,3 +99,7 @@ class WSEventType(StrEnum):
TASK_ASSIGNED = "task.assigned"
TASK_DELETED = "task.deleted"
AGENT_STATUS = "agent.status"
AGENT_STREAM_START = "agent.stream.start"
AGENT_STREAM_DELTA = "agent.stream.delta"
AGENT_STREAM_TOOL = "agent.stream.tool"
AGENT_STREAM_END = "agent.stream.end"

View File

@ -46,6 +46,7 @@ class Message(Base):
# Content
content: Mapped[str] = mapped_column(Text, nullable=False)
thinking: Mapped[str | None] = mapped_column(Text, nullable=True) # LLM reasoning/thinking block
mentions: Mapped[list[str]] = mapped_column(ARRAY(String), default=list)
voice_url: Mapped[str | None] = mapped_column(String(500))

View File

@ -40,6 +40,7 @@ def _to_message_out(msg: Message, author: Member | None = None) -> MessageOut:
author_id=str(msg.author_id) if msg.author_id else None,
author=_to_member_brief(author) if author else None,
content=msg.content,
thinking=msg.thinking,
mentions=msg.mentions or [],
voice_url=msg.voice_url,
attachments=[], # WS broadcasts don't include full attachments
@ -95,6 +96,14 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""):
elif msg_type == WSEventType.PROJECT_UNSUBSCRIBE:
await _handle_unsubscribe(session_id, data)
elif msg_type in (
WSEventType.AGENT_STREAM_START,
WSEventType.AGENT_STREAM_DELTA,
WSEventType.AGENT_STREAM_TOOL,
WSEventType.AGENT_STREAM_END,
):
await _handle_agent_stream(session_id, msg_type, data)
else:
await ws.send_json({"type": WSEventType.ERROR, "message": f"Unknown type: {msg_type}"})
@ -289,6 +298,7 @@ async def _handle_chat_send(session_id: str, data: dict):
chat_id = data.get("chat_id")
task_id = data.get("task_id")
content = data.get("content", "")
thinking = data.get("thinking")
mentions = data.get("mentions", [])
if not content:
@ -307,6 +317,7 @@ async def _handle_chat_send(session_id: str, data: dict):
author_type=member.type,
author_id=member.id,
content=content,
thinking=thinking,
mentions=mentions,
)
db.add(msg)
@ -338,6 +349,49 @@ async def _handle_chat_send(session_id: str, data: dict):
)
async def _handle_agent_stream(session_id: str, event_type: str, data: dict):
"""Relay agent streaming events to project subscribers."""
client = manager.sessions.get(session_id)
if not client:
return
# Only agents can stream
if client.member_type != MemberType.AGENT:
return
# Determine project_id from chat_id or task_id in the data
project_id = data.get("project_id")
chat_id = data.get("chat_id")
task_id = data.get("task_id")
if not project_id and (chat_id or task_id):
async with async_session() as db:
if chat_id:
chat_result = await db.execute(select(Chat).where(Chat.id == uuid.UUID(chat_id)))
chat = chat_result.scalar_one_or_none()
if chat and chat.project_id:
project_id = str(chat.project_id)
elif task_id:
from ..models import Task
task_result = await db.execute(select(Task).where(Task.id == uuid.UUID(task_id)))
task = task_result.scalar_one_or_none()
if task:
project_id = str(task.project_id)
payload = {
"type": event_type,
"data": {
**data,
"agent_slug": client.member_slug,
},
}
if project_id:
await manager.broadcast_message(project_id, payload["data"], author_slug=client.member_slug, author_session_id=session_id, event_type=event_type)
else:
await manager.broadcast_all(payload, exclude_slug=client.member_slug)
async def _handle_subscribe(session_id: str, data: dict):
"""Subscribe this session to project events."""
project_id = data.get("project_id")

View File

@ -79,8 +79,8 @@ class ConnectionManager:
except Exception:
await self.disconnect(client.session_id)
async def broadcast_message(self, project_id: str, message: dict, author_slug: str, author_session_id: str | None = None):
"""Broadcast message.new. Humans get everything, agents filtered.
async def broadcast_message(self, project_id: str, message: dict, author_slug: str, author_session_id: str | None = None, event_type: str | None = None):
"""Broadcast message/stream event. Humans get everything, agents filtered.
Filtering for agents:
- Skip author's session (by session_id if available, else by slug)
@ -92,7 +92,7 @@ class ConnectionManager:
mentions = message.get("mentions", [])
content = message.get("content", "")
author_type = message.get("author_type", "")
payload = {"type": WSEventType.MESSAGE_NEW, "data": message}
payload = {"type": event_type or WSEventType.MESSAGE_NEW, "data": message}
for session_id, client in list(self.sessions.items()):
# Skip only the sending session, not all sessions of the same user