diff --git a/src/tracker/api/members.py b/src/tracker/api/members.py index 252aa3b..914e6a5 100644 --- a/src/tracker/api/members.py +++ b/src/tracker/api/members.py @@ -221,8 +221,8 @@ async def update_my_status( from ..ws.manager import manager await manager.broadcast_all( - {"type": "agent.status", "data": {"slug": current_member.slug, "status": status}}, - exclude_slug=current_member.slug, + {"type": "agent.status", "data": {"id": str(current_member.id), "slug": current_member.slug, "status": status}}, + exclude_member_id=str(current_member.id), ) return {"status": status} diff --git a/src/tracker/api/messages.py b/src/tracker/api/messages.py index 2209041..74cb87e 100644 --- a/src/tracker/api/messages.py +++ b/src/tracker/api/messages.py @@ -121,7 +121,7 @@ async def create_message(req: MessageCreate, request: Request, db: AsyncSession ) msg = result2.scalar_one() - author_slug = msg.author.slug if msg.author else "system" + author_id = str(msg.author_id) if msg.author_id else None # Build response using shared converter msg_data = message_out(msg).model_dump() @@ -138,16 +138,16 @@ async def create_message(req: MessageCreate, request: Request, db: AsyncSession elif chat and chat.kind == ChatKind.LOBBY: await manager.broadcast_all( {"type": "message.new", "data": msg_data}, - exclude_slug=author_slug, + exclude_member_id=author_id, ) return message_out(msg) if project_id: - await manager.broadcast_message(project_id, msg_data, author_slug=author_slug) + await manager.broadcast_message(project_id, msg_data, author_id=author_id) else: await manager.broadcast_all( {"type": "message.new", "data": msg_data}, - exclude_slug=author_slug, + exclude_member_id=author_id, ) return message_out(msg) diff --git a/src/tracker/api/tasks.py b/src/tracker/api/tasks.py index 32cf368..c6e3dcb 100644 --- a/src/tracker/api/tasks.py +++ b/src/tracker/api/tasks.py @@ -160,7 +160,7 @@ async def _system_message( await manager.broadcast_message( project_id, task_msg_out.model_dump(), - author_slug="system", + author_id=None, # system has no member_id ) # Broadcast chat message @@ -177,7 +177,7 @@ async def _system_message( attachments=[], created_at=chat_msg.created_at.isoformat() if chat_msg.created_at else now_iso, ) - await manager.broadcast_message(project_id, chat_msg_out.model_dump(), author_slug="system") + await manager.broadcast_message(project_id, chat_msg_out.model_dump(), author_id=None) async def _get_task(task_id: str, db: AsyncSession) -> Task: @@ -461,6 +461,7 @@ async def take_task( chat_text=f"{key}: @{current_member.slug} взял в работу", task_text=f"@{current_member.slug} взял задачу в работу", project_slug=proj_slug, actor=current_member, + mentioned_members=[current_member], ) await db.commit() @@ -498,6 +499,7 @@ async def reject_task( chat_text=f"{key}: отклонена @{current_member.slug} — {req.reason}", task_text=f"@{current_member.slug} отклонил задачу: {req.reason}", project_slug=proj_slug, actor=current_member, + mentioned_members=[current_member], ) await db.commit() diff --git a/src/tracker/ws/handler.py b/src/tracker/ws/handler.py index 84708cf..8dd40f4 100644 --- a/src/tracker/ws/handler.py +++ b/src/tracker/ws/handler.py @@ -31,7 +31,6 @@ def _to_member_brief(member: Member) -> MemberBrief: def _to_message_out(msg: Message, author: Member | None = None) -> MessageOut: - # Convert mention slugs to MemberBrief (slug-only for WS handler; full resolution in REST) mention_briefs = [MemberBrief(id="", slug=s, name=s) for s in (msg.mentions or [])] return MessageOut( id=str(msg.id), @@ -46,7 +45,7 @@ def _to_message_out(msg: Message, author: Member | None = None) -> MessageOut: mentions=mention_briefs, actor=_to_member_brief(msg.actor) if hasattr(msg, 'actor') and msg.actor else None, voice_url=msg.voice_url, - attachments=[], # WS broadcasts don't include full attachments + attachments=[], created_at=msg.created_at.isoformat() if msg.created_at else "", ) @@ -57,13 +56,11 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""): session_id = None try: - # Try query param token first (for direct JWT auth) if token: session_id = await _authenticate(ws, token) if not session_id: return else: - # Wait for auth message (backward compatibility with agents) auth_msg = await ws.receive_json() if auth_msg.get("type") != WSEventType.AUTH: await ws.send_json({"type": WSEventType.AUTH_ERROR, "message": "First message must be auth"}) @@ -77,7 +74,6 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""): return client = manager.sessions.get(session_id) - slug = client.member_slug if client else None # Main loop while True: @@ -86,19 +82,14 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""): if msg_type == WSEventType.HEARTBEAT: await _handle_heartbeat(session_id, data) - elif msg_type == WSEventType.ACK: pass - elif msg_type == WSEventType.CHAT_SEND: await _handle_chat_send(session_id, data) - elif msg_type == WSEventType.PROJECT_SUBSCRIBE: await _handle_subscribe(session_id, data) - elif msg_type == WSEventType.PROJECT_UNSUBSCRIBE: await _handle_unsubscribe(session_id, data) - elif msg_type in ( WSEventType.AGENT_STREAM_START, WSEventType.AGENT_STREAM_DELTA, @@ -106,7 +97,6 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""): WSEventType.AGENT_STREAM_END, ): await _handle_agent_stream(session_id, msg_type, data) - else: await ws.send_json({"type": WSEventType.ERROR, "message": f"Unknown type: {msg_type}"}) @@ -119,17 +109,19 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""): finally: if session_id: client = await manager.disconnect(session_id) - if client and not manager.is_online(client.member_slug): - # Last session for this slug — mark offline + if client and not manager.is_online(client.member_id): + # Last session for this member — mark offline async with async_session() as db: - result = await db.execute(select(Member).where(Member.slug == client.member_slug)) + result = await db.execute(select(Member).where(Member.id == uuid.UUID(client.member_id))) member = result.scalar_one_or_none() if member: member.status = MemberStatus.OFFLINE await db.commit() await manager.broadcast_all( - {"type": WSEventType.AGENT_STATUS, "data": {"slug": client.member_slug, "status": MemberStatus.OFFLINE}}, - exclude_slug=client.member_slug, + {"type": WSEventType.AGENT_STATUS, "data": { + "id": client.member_id, "slug": client.member_slug, "status": MemberStatus.OFFLINE, + }}, + exclude_member_id=client.member_id, ) @@ -137,34 +129,24 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No """Authenticate and register session. Returns session_id or None.""" async with async_session() as db: member = None - - # Check if it's an agent token (starts with 'tb-') + if token.startswith("tb-"): result = await db.execute( select(Member).where(Member.token == token).options(selectinload(Member.agent_config)) ) member = result.scalar_one_or_none() else: - # Try JWT decode from ..api.auth import decode_jwt try: payload = decode_jwt(token) member_id = payload["sub"] - # sub can be UUID (Tracker JWT) or slug (legacy BFF JWT) result = await db.execute( select(Member).where(Member.id == member_id) .options(selectinload(Member.agent_config)) ) member = result.scalar_one_or_none() - if not member and payload.get("slug"): - # Fallback: try by slug - result = await db.execute( - select(Member).where(Member.slug == payload["slug"]) - .options(selectinload(Member.agent_config)) - ) - member = result.scalar_one_or_none() if member: - logger.info("JWT auth successful for %s", member.slug) + logger.info("JWT auth successful for %s (id=%s)", member.slug, str(member.id)[:8]) except Exception as e: logger.warning("JWT decode failed: %s", e) @@ -173,64 +155,57 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No await ws.close() return None - # BFF proxy: bridge acts on behalf of user - effective_slug = member.slug - effective_type = member.type + # Bridge proxy + effective_member = member if on_behalf_of and member.type == MemberType.BRIDGE: user_result = await db.execute( - select(Member).where(Member.slug == on_behalf_of) + select(Member).where(Member.id == uuid.UUID(on_behalf_of)) .options(selectinload(Member.agent_config)) ) user_member = user_result.scalar_one_or_none() if user_member: - effective_slug = user_member.slug - effective_type = user_member.type - member = user_member - logger.info("Bridge acting on behalf of %s", effective_slug) + effective_member = user_member + logger.info("Bridge acting on behalf of %s (id=%s)", effective_member.slug, str(effective_member.id)[:8]) else: - effective_slug = f"web-{on_behalf_of}" - logger.info("Bridge acting on behalf of unknown user → %s", effective_slug) + logger.warning("Bridge on_behalf_of member not found: %s", on_behalf_of) # Listen modes chat_listen = ListenMode.ALL task_listen = ListenMode.ALL - if member.agent_config: - chat_listen = member.agent_config.chat_listen - task_listen = member.agent_config.task_listen + if effective_member.agent_config: + chat_listen = effective_member.agent_config.chat_listen + task_listen = effective_member.agent_config.task_listen - # Register connection with unique session_id + # Register session_id = str(uuid.uuid4()) client = ConnectedClient( ws=ws, session_id=session_id, - member_id=str(member.id), - member_slug=effective_slug, - member_type=effective_type, + member_id=str(effective_member.id), + member_slug=effective_member.slug, + member_type=effective_member.type, chat_listen=chat_listen, task_listen=task_listen, ) await manager.connect(client) # Update status - member.status = MemberStatus.ONLINE + effective_member.status = MemberStatus.ONLINE await db.commit() # Get lobby chat + projects lobby = await db.execute(select(Chat).where(Chat.kind == ChatKind.LOBBY)) lobby_chat = lobby.scalar_one_or_none() - # Filter projects by membership (show all for owners) - if member.role == MemberRole.OWNER: - # Owners see all projects + if effective_member.role == MemberRole.OWNER: projects = await db.execute(select(Project).where(Project.status == ProjectStatus.ACTIVE)) else: - # Members see only projects they belong to projects = await db.execute( select(Project) .join(ProjectMember, Project.id == ProjectMember.project_id) - .where(Project.status == ProjectStatus.ACTIVE, ProjectMember.member_id == member.id) + .where(Project.status == ProjectStatus.ACTIVE, ProjectMember.member_id == effective_member.id) ) - + project_list = [] for p in projects.scalars(): chat_result = await db.execute( @@ -244,31 +219,41 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No "chat_id": str(chat.id) if chat else None, }) - # Auto-subscribe to all member's projects + # Auto-subscribe for p in project_list: client.subscribed_projects.add(p["id"]) + # Build online list as [{id, slug}] + online_list = [] + seen = set() + for s in manager.sessions.values(): + if s.member_id not in seen: + seen.add(s.member_id) + online_list.append({"id": s.member_id, "slug": s.member_slug}) + await ws.send_json({ "type": WSEventType.AUTH_OK, "data": { - "slug": effective_slug, + "member_id": str(effective_member.id), + "slug": effective_member.slug, "lobby_chat_id": str(lobby_chat.id) if lobby_chat else None, "projects": project_list, - "online": manager.online_slugs, + "online": online_list, }, }) # Notify others await manager.broadcast_all( - {"type": WSEventType.AGENT_STATUS, "data": {"slug": effective_slug, "status": MemberStatus.ONLINE}}, - exclude_slug=effective_slug, + {"type": WSEventType.AGENT_STATUS, "data": { + "id": str(effective_member.id), "slug": effective_member.slug, "status": MemberStatus.ONLINE, + }}, + exclude_member_id=str(effective_member.id), ) return session_id async def _handle_heartbeat(session_id: str, data: dict): - """Update heartbeat timestamp.""" from datetime import datetime, timezone client = manager.sessions.get(session_id) @@ -279,25 +264,25 @@ async def _handle_heartbeat(session_id: str, data: dict): client.last_heartbeat = datetime.now(timezone.utc) async with async_session() as db: - result = await db.execute(select(Member).where(Member.slug == client.member_slug)) + result = await db.execute(select(Member).where(Member.id == uuid.UUID(client.member_id))) member = result.scalar_one_or_none() if member: member.status = status await db.commit() await manager.broadcast_all( - {"type": WSEventType.AGENT_STATUS, "data": {"slug": client.member_slug, "status": status}}, - exclude_slug=client.member_slug, + {"type": WSEventType.AGENT_STATUS, "data": { + "id": client.member_id, "slug": client.member_slug, "status": status, + }}, + exclude_member_id=client.member_id, ) async def _handle_chat_send(session_id: str, data: dict): - """Handle chat message sent via WS.""" client = manager.sessions.get(session_id) if not client: return - slug = client.member_slug chat_id = data.get("chat_id") task_id = data.get("task_id") content = data.get("content", "") @@ -308,7 +293,7 @@ async def _handle_chat_send(session_id: str, data: dict): return async with async_session() as db: - result = await db.execute(select(Member).where(Member.slug == slug, Member.is_active == True)) + result = await db.execute(select(Member).where(Member.id == uuid.UUID(client.member_id), Member.is_active == True)) member = result.scalar_one_or_none() if not member: await client.ws.send_json({"type": WSEventType.ERROR, "message": "Member not found or inactive"}) @@ -329,7 +314,6 @@ async def _handle_chat_send(session_id: str, data: dict): msg_data = _to_message_out(msg, member).model_dump() - # Determine project_id for filtering project_id = None if chat_id: chat_result = await db.execute(select(Chat).where(Chat.id == uuid.UUID(chat_id))) @@ -339,33 +323,33 @@ async def _handle_chat_send(session_id: str, data: dict): elif chat and chat.kind == ChatKind.LOBBY: await manager.broadcast_all( {"type": WSEventType.MESSAGE_NEW, "data": msg_data}, - exclude_slug=slug, + exclude_member_id=client.member_id, ) return if project_id: - await manager.broadcast_message(project_id, msg_data, author_slug=slug, author_session_id=session_id) + await manager.broadcast_message( + project_id, msg_data, + author_id=client.member_id, + author_session_id=session_id, + ) else: await manager.broadcast_all( {"type": WSEventType.MESSAGE_NEW, "data": msg_data}, - exclude_slug=slug, + exclude_member_id=client.member_id, ) async def _handle_agent_stream(session_id: str, event_type: str, data: dict): - """Relay agent streaming events to project subscribers.""" logger.info("STREAM event: %s data=%s", event_type, str(data)[:200]) client = manager.sessions.get(session_id) if not client: - logger.warning("STREAM: no client for session %s", session_id[:8]) return - # Only agents can stream if client.member_type != MemberType.AGENT: logger.warning("STREAM: non-agent %s tried to stream", client.member_slug) return - # Determine project_id from chat_id or task_id in the data project_id = data.get("project_id") chat_id = data.get("chat_id") task_id = data.get("task_id") @@ -388,18 +372,23 @@ async def _handle_agent_stream(session_id: str, event_type: str, data: dict): "type": event_type, "data": { **data, - "agent_slug": client.member_slug, + "agent_id": client.member_id, + "agent_slug": client.member_slug, # kept for display convenience }, } if project_id: - await manager.broadcast_message(project_id, payload["data"], author_slug=client.member_slug, author_session_id=session_id, event_type=event_type) + await manager.broadcast_message( + project_id, payload["data"], + author_id=client.member_id, + author_session_id=session_id, + event_type=event_type, + ) else: - await manager.broadcast_all(payload, exclude_slug=client.member_slug) + await manager.broadcast_all(payload, exclude_member_id=client.member_id) async def _handle_subscribe(session_id: str, data: dict): - """Subscribe this session to project events.""" project_id = data.get("project_id") if not project_id: return @@ -410,7 +399,6 @@ async def _handle_subscribe(session_id: str, data: dict): async def _handle_unsubscribe(session_id: str, data: dict): - """Unsubscribe this session from project events.""" project_id = data.get("project_id") if not project_id: return diff --git a/src/tracker/ws/manager.py b/src/tracker/ws/manager.py index cab6514..d8b80d8 100644 --- a/src/tracker/ws/manager.py +++ b/src/tracker/ws/manager.py @@ -16,8 +16,8 @@ logger = logging.getLogger("tracker.ws") class ConnectedClient: ws: WebSocket session_id: str # unique per connection - member_id: str | None = None # UUID as string - member_slug: str = "" + member_id: str = "" # UUID as string — primary identifier + member_slug: str = "" # for display/logging only member_type: str = "" # human | agent | bridge chat_listen: str = ListenMode.ALL task_listen: str = ListenMode.ALL @@ -29,38 +29,39 @@ class ConnectionManager: def __init__(self): # session_id → client (one entry per WS connection) self.sessions: dict[str, ConnectedClient] = {} - # slug → set of session_ids (for quick lookup) - self.slug_sessions: dict[str, set[str]] = {} + # member_id (UUID) → set of session_ids (for quick lookup) + self.member_sessions: dict[str, set[str]] = {} async def connect(self, client: ConnectedClient): self.sessions[client.session_id] = client - if client.member_slug not in self.slug_sessions: - self.slug_sessions[client.member_slug] = set() - self.slug_sessions[client.member_slug].add(client.session_id) - logger.info("WS connected: %s session=%s (%s)", client.member_slug, client.session_id[:8], client.member_type) + if client.member_id not in self.member_sessions: + self.member_sessions[client.member_id] = set() + self.member_sessions[client.member_id].add(client.session_id) + logger.info("WS connected: %s (id=%s) session=%s (%s)", + client.member_slug, client.member_id[:8], client.session_id[:8], client.member_type) async def disconnect(self, session_id: str): client = self.sessions.pop(session_id, None) if client: - slug_set = self.slug_sessions.get(client.member_slug) - if slug_set: - slug_set.discard(session_id) - if not slug_set: - del self.slug_sessions[client.member_slug] + id_set = self.member_sessions.get(client.member_id) + if id_set: + id_set.discard(session_id) + if not id_set: + del self.member_sessions[client.member_id] logger.info("WS disconnected: %s session=%s", client.member_slug, session_id[:8]) return client - def get_sessions_for_slug(self, slug: str) -> list[ConnectedClient]: - """Get all active sessions for a member slug.""" - session_ids = self.slug_sessions.get(slug, set()) + def get_sessions_for_member(self, member_id: str) -> list[ConnectedClient]: + """Get all active sessions for a member by UUID.""" + session_ids = self.member_sessions.get(member_id, set()) return [self.sessions[sid] for sid in session_ids if sid in self.sessions] - def is_online(self, slug: str) -> bool: - return bool(self.slug_sessions.get(slug)) + def is_online(self, member_id: str) -> bool: + return bool(self.member_sessions.get(member_id)) @property - def online_slugs(self) -> list[str]: - return list(self.slug_sessions.keys()) + def online_member_ids(self) -> list[str]: + return list(self.member_sessions.keys()) async def send_to_session(self, session_id: str, data: dict): """Send to a specific session.""" @@ -71,62 +72,78 @@ class ConnectionManager: except Exception: await self.disconnect(session_id) - async def send_to_slug(self, slug: str, data: dict): - """Send to ALL sessions of a member.""" - for client in self.get_sessions_for_slug(slug): + async def send_to_member(self, member_id: str, data: dict): + """Send to ALL sessions of a member by UUID.""" + for client in self.get_sessions_for_member(member_id): try: await client.ws.send_json(data) except Exception: await self.disconnect(client.session_id) - async def broadcast_message(self, project_id: str, message: dict, author_slug: str, author_session_id: str | None = None, event_type: str | None = None): + async def broadcast_message(self, project_id: str, message: dict, + author_id: str | None = None, + author_session_id: str | None = None, + event_type: str | None = None): """Broadcast message/stream event. Humans get everything, agents filtered. - + Filtering for agents: - - Skip author's session (by session_id if available, else by slug) + - Skip author's session (by session_id if available, else by member_id) - Skip if not subscribed to project - Skip if chat_listen == "none" - - If chat_listen == "mentions": only if @slug in mentions - - System messages: only if @slug mentioned in content + - If chat_listen == "mentions": only if member_id in mention IDs + - System messages: only if member_id in mention IDs """ raw_mentions = message.get("mentions", []) - # mentions can be list of strings (slugs) or list of dicts (MemberBrief objects) - mentions: list[str] = [] + # Extract member IDs from mentions (objects or strings) + mention_ids: set[str] = set() + mention_slugs: set[str] = set() for m in raw_mentions: - if isinstance(m, str): - mentions.append(m) - elif isinstance(m, dict): - mentions.append(m.get("slug", "")) + if isinstance(m, dict): + if m.get("id"): + mention_ids.add(m["id"]) + if m.get("slug"): + mention_slugs.add(m["slug"]) + elif isinstance(m, str): + mention_slugs.add(m) + content = message.get("content", "") author_type = message.get("author_type", "") payload = {"type": event_type or WSEventType.MESSAGE_NEW, "data": message} for session_id, client in list(self.sessions.items()): - # Skip only the sending session, not all sessions of the same user + # Skip the sending session (or all sessions of author if no session_id) if author_session_id and session_id == author_session_id: continue - if not author_session_id and client.member_slug == author_slug: + if not author_session_id and author_id and client.member_id == author_id: continue + # Humans/bridges get ALL messages if client.member_type in (MemberType.HUMAN, MemberType.BRIDGE): await self.send_to_session(session_id, payload) continue + # Agents: subscription check if project_id not in client.subscribed_projects: continue if client.chat_listen == ListenMode.NONE: continue - # System messages: only if agent is mentioned (by slug in mentions array, or @slug in content for compat) + + # System messages: only if agent is mentioned (by ID or slug for compat) if author_type == AuthorType.SYSTEM: - mentioned_in_mentions = client.member_slug in mentions - mentioned_in_content = f"@{client.member_slug}" in content - if not mentioned_in_mentions and not mentioned_in_content: + mentioned = ( + client.member_id in mention_ids or + client.member_slug in mention_slugs or + f"@{client.member_slug}" in content # text fallback for compat + ) + if not mentioned: continue await self.send_to_session(session_id, payload) continue + # Regular messages: chat_listen filter - if client.chat_listen == ListenMode.MENTIONS and client.member_slug not in mentions: - continue + if client.chat_listen == ListenMode.MENTIONS: + if client.member_id not in mention_ids and client.member_slug not in mention_slugs: + continue await self.send_to_session(session_id, payload) async def broadcast_task_event(self, project_id: str, event_type: str, data: dict): @@ -156,10 +173,10 @@ class ConnectionManager: ): await self.send_to_session(session_id, payload) - async def broadcast_all(self, data: dict, exclude_slug: str | None = None): + async def broadcast_all(self, data: dict, exclude_member_id: str | None = None): """Broadcast to all connected sessions.""" for session_id, client in list(self.sessions.items()): - if client.member_slug == exclude_slug: + if exclude_member_id and client.member_id == exclude_member_id: continue await self.send_to_session(session_id, data)