From 6a6eaada3ed4619ce037e1362b5ed9e4443de51f Mon Sep 17 00:00:00 2001 From: Markov Date: Mon, 23 Feb 2026 17:12:33 +0100 Subject: [PATCH] fix: align tracker with TRACKER-PROTOCOL.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - auth.ok: add chat_id to project list (JOIN with Chat where kind=project) - message.new broadcast: include author_name field - chat_listen: add support for 'none' mode in manager.py - task events: add broadcast for task.created, task.updated, task.assigned via REST - heartbeat: implement 90s timeout → status=offline + agent.status broadcast - tokens: use tb-{hex} format instead of urlsafe encoding - models: support 'none' mode for chat_listen and task_listen - fix Dockerfile: remove missing alembic files --- Dockerfile | 4 +-- src/tracker/api/members.py | 4 +-- src/tracker/api/messages.py | 7 +++++ src/tracker/api/tasks.py | 50 ++++++++++++++++++++++++++++++ src/tracker/app.py | 59 ++++++++++++++++++++++++++++++++++++ src/tracker/models/member.py | 4 +-- src/tracker/ws/handler.py | 30 ++++++++++++++++-- src/tracker/ws/manager.py | 12 ++++++-- 8 files changed, 159 insertions(+), 11 deletions(-) diff --git a/Dockerfile b/Dockerfile index f10e4bb..80d513a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,8 +5,6 @@ WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY alembic.ini . -COPY alembic/ alembic/ COPY src/ src/ ENV PYTHONPATH=/app/src @@ -14,4 +12,4 @@ ENV PYTHONUNBUFFERED=1 EXPOSE 8100 -CMD ["sh", "-c", "alembic upgrade head && uvicorn tracker.app:app --host 0.0.0.0 --port 8100"] +CMD ["uvicorn", "tracker.app:app", "--host", "0.0.0.0", "--port", "8100"] diff --git a/src/tracker/api/members.py b/src/tracker/api/members.py index 957b60f..b55a208 100644 --- a/src/tracker/api/members.py +++ b/src/tracker/api/members.py @@ -122,7 +122,7 @@ async def create_member(req: MemberCreate, db: AsyncSession = Depends(get_db)): token = None if req.type in ("agent", "bridge"): - token = f"tb-{secrets.token_hex(16)}" + token = f"tb-{secrets.token_hex(32)}" member = Member( name=req.name, @@ -210,7 +210,7 @@ async def regenerate_token(slug: str, db: AsyncSession = Depends(get_db)): raise HTTPException(404, "Member not found") if member.type != "agent": raise HTTPException(400, "Only agent tokens can be regenerated") - token = f"tb-{secrets.token_urlsafe(32)}" + token = f"tb-{secrets.token_hex(32)}" member.token = token await db.commit() return {"token": token} diff --git a/src/tracker/api/messages.py b/src/tracker/api/messages.py index 52ee1d3..5b44e2d 100644 --- a/src/tracker/api/messages.py +++ b/src/tracker/api/messages.py @@ -121,6 +121,12 @@ async def create_message(req: MessageCreate, db: AsyncSession = Depends(get_db)) ) msg = result2.scalar_one() + # Get author name for broadcast + from tracker.models import Member + author_result = await db.execute(select(Member).where(Member.slug == msg.author_slug)) + author = author_result.scalar_one_or_none() + author_name = author.name if author else msg.author_slug + # Broadcast via WebSocket from tracker.ws.manager import manager msg_data = { @@ -129,6 +135,7 @@ async def create_message(req: MessageCreate, db: AsyncSession = Depends(get_db)) "task_id": req.task_id, "author_type": msg.author_type, "author_slug": msg.author_slug, + "author_name": author_name, "content": msg.content, "mentions": msg.mentions or [], "created_at": msg.created_at.isoformat(), diff --git a/src/tracker/api/tasks.py b/src/tracker/api/tasks.py index 3fcb444..9aac36c 100644 --- a/src/tracker/api/tasks.py +++ b/src/tracker/api/tasks.py @@ -184,6 +184,23 @@ async def create_task( await db.refresh(task) # Load steps task_full = await _get_task(str(task.id), db) + + # Broadcast task.created event + from tracker.ws.manager import manager + task_data = { + "id": str(task.id), + "project_id": str(project.id), + "number": task.number, + "key": f"{project.slug[:2].upper()}-{task.number}", + "title": task.title, + "status": task.status, + "assignee_slug": task.assignee_slug, + "reviewer_slug": task.reviewer_slug, + "watchers": task.watchers or [], + "created_at": task.created_at.isoformat() if task.created_at else "", + } + await manager.broadcast_task_event(str(project.id), "task.created", task_data) + return _task_out(task_full, task_full.project.slug if task_full.project else "") @@ -212,6 +229,17 @@ async def update_task(task_id: str, req: TaskUpdate, db: AsyncSession = Depends( await db.commit() await db.refresh(task) + + # Broadcast task.updated event + from tracker.ws.manager import manager + task_data = { + "id": str(task.id), + "status": task.status, + "assignee_slug": task.assignee_slug, + "updated_at": task.updated_at.isoformat() if task.updated_at else "", + } + await manager.broadcast_task_event(str(task.project_id), "task.updated", task_data) + return _task_out(task, task.project.slug if task.project else "") @@ -236,6 +264,17 @@ async def take_task(task_id: str, slug: str = Query(...), db: AsyncSession = Dep task.watchers = (task.watchers or []) + [slug] await db.commit() await db.refresh(task) + + # Broadcast task.assigned event + from tracker.ws.manager import manager + task_data = { + "id": str(task.id), + "assignee_slug": slug, + "assigner_slug": slug, # self-assigned + "assigned_at": task.updated_at.isoformat() if task.updated_at else "", + } + await manager.broadcast_task_event(str(task.project_id), "task.assigned", task_data) + return _task_out(task, task.project.slug if task.project else "") @@ -259,6 +298,17 @@ async def assign_task(task_id: str, req: AssignRequest, db: AsyncSession = Depen task.watchers = (task.watchers or []) + [req.assignee_slug] await db.commit() await db.refresh(task) + + # Broadcast task.assigned event + from tracker.ws.manager import manager + task_data = { + "id": str(task.id), + "assignee_slug": req.assignee_slug, + "assigner_slug": "admin", # TODO: get from auth context + "assigned_at": task.updated_at.isoformat() if task.updated_at else "", + } + await manager.broadcast_task_event(str(task.project_id), "task.assigned", task_data) + return _task_out(task, task.project.slug if task.project else "") diff --git a/src/tracker/app.py b/src/tracker/app.py index ada657c..19aceac 100644 --- a/src/tracker/app.py +++ b/src/tracker/app.py @@ -1,5 +1,6 @@ """FastAPI application.""" +import asyncio import logging import time import traceback @@ -21,6 +22,53 @@ logging.basicConfig( logger = logging.getLogger("tracker") +async def heartbeat_monitor(): + """Monitor heartbeat timeout — set status=offline after 90 seconds.""" + from tracker.ws.manager import manager + from tracker.database import async_session + from tracker.models import Member + from datetime import datetime, timezone, timedelta + from sqlalchemy import select, update + + while True: + try: + await asyncio.sleep(30) # check every 30 seconds + + # Get clients with last heartbeat timeout + timeout_threshold = datetime.now(timezone.utc) - timedelta(seconds=90) + timed_out_clients = [] + + for slug, client in list(manager.clients.items()): + if not hasattr(client, 'last_heartbeat'): + client.last_heartbeat = datetime.now(timezone.utc) + continue + + if client.last_heartbeat < timeout_threshold: + timed_out_clients.append(slug) + + if timed_out_clients: + async with async_session() as db: + # Update status to offline + await db.execute( + update(Member) + .where(Member.slug.in_(timed_out_clients)) + .values(status="offline") + ) + await db.commit() + + # Broadcast status changes and disconnect + for slug in timed_out_clients: + await manager.broadcast_all( + {"type": "agent.status", "data": {"slug": slug, "status": "offline"}}, + exclude=slug, + ) + await manager.disconnect(slug) + logger.info("Heartbeat timeout: %s set offline", slug) + + except Exception as e: + logger.error("Heartbeat monitor error: %s", e) + + @asynccontextmanager async def lifespan(app: FastAPI): """Create tables on startup (dev mode only).""" @@ -28,7 +76,18 @@ async def lifespan(app: FastAPI): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("Database tables ensured.") + + # Start heartbeat monitor + heartbeat_task = asyncio.create_task(heartbeat_monitor()) + yield + + # Cleanup + heartbeat_task.cancel() + try: + await heartbeat_task + except asyncio.CancelledError: + pass app = FastAPI( diff --git a/src/tracker/models/member.py b/src/tracker/models/member.py index 735fc2f..8c67ff8 100644 --- a/src/tracker/models/member.py +++ b/src/tracker/models/member.py @@ -35,8 +35,8 @@ class AgentConfig(Base): UUID(as_uuid=True), ForeignKey("members.id"), nullable=False, unique=True ) capabilities: Mapped[list[str]] = mapped_column(ARRAY(String), default=list) - chat_listen: Mapped[str] = mapped_column(String(20), default="mentions") # all | mentions - task_listen: Mapped[str] = mapped_column(String(20), default="mentions") # all | mentions + chat_listen: Mapped[str] = mapped_column(String(20), default="mentions") # all | mentions | none + task_listen: Mapped[str] = mapped_column(String(20), default="mentions") # all | mentions | none prompt: Mapped[str | None] = mapped_column(Text) model: Mapped[str | None] = mapped_column(String(100)) diff --git a/src/tracker/ws/handler.py b/src/tracker/ws/handler.py index 9c5c74e..2e8e388 100644 --- a/src/tracker/ws/handler.py +++ b/src/tracker/ws/handler.py @@ -124,12 +124,24 @@ async def _authenticate(ws: WebSocket, token: str) -> str | None: member.status = "online" await db.commit() - # Get lobby chat + projects + # Get lobby chat + projects with chat_id lobby = await db.execute(select(Chat).where(Chat.kind == "lobby")) lobby_chat = lobby.scalar_one_or_none() projects = await db.execute(select(Project).where(Project.status == "active")) - project_list = [{"id": str(p.id), "slug": p.slug, "name": p.name} for p in projects.scalars()] + project_list = [] + for p in projects.scalars(): + # Get project chat + chat_result = await db.execute( + select(Chat).where(Chat.project_id == p.id, Chat.kind == "project") + ) + chat = chat_result.scalar_one_or_none() + project_list.append({ + "id": str(p.id), + "slug": p.slug, + "name": p.name, + "chat_id": str(chat.id) if chat else None, + }) online = list(manager.clients.keys()) @@ -154,13 +166,27 @@ async def _authenticate(ws: WebSocket, token: str) -> str | None: async def _handle_heartbeat(slug: str, data: dict): """Update member status from heartbeat.""" + from datetime import datetime, timezone + status = data.get("status", "online") + + # Update last heartbeat timestamp + client = manager.clients.get(slug) + if client: + client.last_heartbeat = datetime.now(timezone.utc) + async with async_session() as db: result = await db.execute(select(Member).where(Member.slug == slug)) member = result.scalar_one_or_none() if member: member.status = status await db.commit() + + # Broadcast status change if different + await manager.broadcast_all( + {"type": "agent.status", "data": {"slug": slug, "status": status}}, + exclude=slug, + ) async def _handle_chat_send(slug: str, data: dict): diff --git a/src/tracker/ws/manager.py b/src/tracker/ws/manager.py index 6ab21ca..734fd26 100644 --- a/src/tracker/ws/manager.py +++ b/src/tracker/ws/manager.py @@ -2,6 +2,7 @@ import logging from dataclasses import dataclass, field +from datetime import datetime, timezone from fastapi import WebSocket @@ -13,9 +14,10 @@ class ConnectedClient: ws: WebSocket member_slug: str member_type: str # human | agent | bridge - chat_listen: str = "all" # all | mentions - task_listen: str = "all" # all | mentions + chat_listen: str = "all" # all | mentions | none + task_listen: str = "all" # all | mentions | none subscribed_projects: set[str] = field(default_factory=set) # project_ids + last_heartbeat: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) class ConnectionManager: @@ -49,6 +51,8 @@ class ConnectionManager: if project_id not in client.subscribed_projects: continue # Filter by chat_listen + if client.chat_listen == "none": + continue # don't send any chat messages if client.chat_listen == "mentions" and slug not in mentions: continue await self.send_to(slug, {"type": "message.new", "data": message}) @@ -63,6 +67,10 @@ class ConnectionManager: if project_id not in client.subscribed_projects: continue + # task_listen: none → skip all task events + if client.task_listen == "none": + continue + # task_listen: all → get everything if client.task_listen == "all": await self.send_to(slug, {"type": event_type, "data": data})