UUID as primary identifier everywhere (replace slug-based routing)
Some checks failed
Deploy Tracker / deploy (push) Failing after 3s

WS Manager:
- member_sessions keyed by member_id (UUID), not slug
- broadcast_message: author_id instead of author_slug
- broadcast_all: exclude_member_id instead of exclude_slug
- mention filtering by member_id from MemberBrief objects

WS Handler:
- All DB lookups by Member.id (UUID), not slug
- auth.ok includes member_id + online as [{id, slug}]
- agent.status events include id + slug
- Bridge on_behalf_of by UUID

API callers (messages, members, tasks):
- All broadcast calls use author_id/exclude_member_id
This commit is contained in:
markov 2026-02-27 09:20:19 +01:00
parent 5642f53e11
commit b8e836fd07
5 changed files with 136 additions and 129 deletions

View File

@ -221,8 +221,8 @@ async def update_my_status(
from ..ws.manager import manager
await manager.broadcast_all(
{"type": "agent.status", "data": {"slug": current_member.slug, "status": status}},
exclude_slug=current_member.slug,
{"type": "agent.status", "data": {"id": str(current_member.id), "slug": current_member.slug, "status": status}},
exclude_member_id=str(current_member.id),
)
return {"status": status}

View File

@ -121,7 +121,7 @@ async def create_message(req: MessageCreate, request: Request, db: AsyncSession
)
msg = result2.scalar_one()
author_slug = msg.author.slug if msg.author else "system"
author_id = str(msg.author_id) if msg.author_id else None
# Build response using shared converter
msg_data = message_out(msg).model_dump()
@ -138,16 +138,16 @@ async def create_message(req: MessageCreate, request: Request, db: AsyncSession
elif chat and chat.kind == ChatKind.LOBBY:
await manager.broadcast_all(
{"type": "message.new", "data": msg_data},
exclude_slug=author_slug,
exclude_member_id=author_id,
)
return message_out(msg)
if project_id:
await manager.broadcast_message(project_id, msg_data, author_slug=author_slug)
await manager.broadcast_message(project_id, msg_data, author_id=author_id)
else:
await manager.broadcast_all(
{"type": "message.new", "data": msg_data},
exclude_slug=author_slug,
exclude_member_id=author_id,
)
return message_out(msg)

View File

@ -160,7 +160,7 @@ async def _system_message(
await manager.broadcast_message(
project_id,
task_msg_out.model_dump(),
author_slug="system",
author_id=None, # system has no member_id
)
# Broadcast chat message
@ -177,7 +177,7 @@ async def _system_message(
attachments=[],
created_at=chat_msg.created_at.isoformat() if chat_msg.created_at else now_iso,
)
await manager.broadcast_message(project_id, chat_msg_out.model_dump(), author_slug="system")
await manager.broadcast_message(project_id, chat_msg_out.model_dump(), author_id=None)
async def _get_task(task_id: str, db: AsyncSession) -> Task:
@ -461,6 +461,7 @@ async def take_task(
chat_text=f"{key}: @{current_member.slug} взял в работу",
task_text=f"@{current_member.slug} взял задачу в работу",
project_slug=proj_slug, actor=current_member,
mentioned_members=[current_member],
)
await db.commit()
@ -498,6 +499,7 @@ async def reject_task(
chat_text=f"{key}: отклонена @{current_member.slug}{req.reason}",
task_text=f"@{current_member.slug} отклонил задачу: {req.reason}",
project_slug=proj_slug, actor=current_member,
mentioned_members=[current_member],
)
await db.commit()

View File

@ -31,7 +31,6 @@ def _to_member_brief(member: Member) -> MemberBrief:
def _to_message_out(msg: Message, author: Member | None = None) -> MessageOut:
# Convert mention slugs to MemberBrief (slug-only for WS handler; full resolution in REST)
mention_briefs = [MemberBrief(id="", slug=s, name=s) for s in (msg.mentions or [])]
return MessageOut(
id=str(msg.id),
@ -46,7 +45,7 @@ def _to_message_out(msg: Message, author: Member | None = None) -> MessageOut:
mentions=mention_briefs,
actor=_to_member_brief(msg.actor) if hasattr(msg, 'actor') and msg.actor else None,
voice_url=msg.voice_url,
attachments=[], # WS broadcasts don't include full attachments
attachments=[],
created_at=msg.created_at.isoformat() if msg.created_at else "",
)
@ -57,13 +56,11 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""):
session_id = None
try:
# Try query param token first (for direct JWT auth)
if token:
session_id = await _authenticate(ws, token)
if not session_id:
return
else:
# Wait for auth message (backward compatibility with agents)
auth_msg = await ws.receive_json()
if auth_msg.get("type") != WSEventType.AUTH:
await ws.send_json({"type": WSEventType.AUTH_ERROR, "message": "First message must be auth"})
@ -77,7 +74,6 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""):
return
client = manager.sessions.get(session_id)
slug = client.member_slug if client else None
# Main loop
while True:
@ -86,19 +82,14 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""):
if msg_type == WSEventType.HEARTBEAT:
await _handle_heartbeat(session_id, data)
elif msg_type == WSEventType.ACK:
pass
elif msg_type == WSEventType.CHAT_SEND:
await _handle_chat_send(session_id, data)
elif msg_type == WSEventType.PROJECT_SUBSCRIBE:
await _handle_subscribe(session_id, data)
elif msg_type == WSEventType.PROJECT_UNSUBSCRIBE:
await _handle_unsubscribe(session_id, data)
elif msg_type in (
WSEventType.AGENT_STREAM_START,
WSEventType.AGENT_STREAM_DELTA,
@ -106,7 +97,6 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""):
WSEventType.AGENT_STREAM_END,
):
await _handle_agent_stream(session_id, msg_type, data)
else:
await ws.send_json({"type": WSEventType.ERROR, "message": f"Unknown type: {msg_type}"})
@ -119,17 +109,19 @@ async def websocket_endpoint(ws: WebSocket, token: str = ""):
finally:
if session_id:
client = await manager.disconnect(session_id)
if client and not manager.is_online(client.member_slug):
# Last session for this slug — mark offline
if client and not manager.is_online(client.member_id):
# Last session for this member — mark offline
async with async_session() as db:
result = await db.execute(select(Member).where(Member.slug == client.member_slug))
result = await db.execute(select(Member).where(Member.id == uuid.UUID(client.member_id)))
member = result.scalar_one_or_none()
if member:
member.status = MemberStatus.OFFLINE
await db.commit()
await manager.broadcast_all(
{"type": WSEventType.AGENT_STATUS, "data": {"slug": client.member_slug, "status": MemberStatus.OFFLINE}},
exclude_slug=client.member_slug,
{"type": WSEventType.AGENT_STATUS, "data": {
"id": client.member_id, "slug": client.member_slug, "status": MemberStatus.OFFLINE,
}},
exclude_member_id=client.member_id,
)
@ -138,33 +130,23 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No
async with async_session() as db:
member = None
# Check if it's an agent token (starts with 'tb-')
if token.startswith("tb-"):
result = await db.execute(
select(Member).where(Member.token == token).options(selectinload(Member.agent_config))
)
member = result.scalar_one_or_none()
else:
# Try JWT decode
from ..api.auth import decode_jwt
try:
payload = decode_jwt(token)
member_id = payload["sub"]
# sub can be UUID (Tracker JWT) or slug (legacy BFF JWT)
result = await db.execute(
select(Member).where(Member.id == member_id)
.options(selectinload(Member.agent_config))
)
member = result.scalar_one_or_none()
if not member and payload.get("slug"):
# Fallback: try by slug
result = await db.execute(
select(Member).where(Member.slug == payload["slug"])
.options(selectinload(Member.agent_config))
)
member = result.scalar_one_or_none()
if member:
logger.info("JWT auth successful for %s", member.slug)
logger.info("JWT auth successful for %s (id=%s)", member.slug, str(member.id)[:8])
except Exception as e:
logger.warning("JWT decode failed: %s", e)
@ -173,62 +155,55 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No
await ws.close()
return None
# BFF proxy: bridge acts on behalf of user
effective_slug = member.slug
effective_type = member.type
# Bridge proxy
effective_member = member
if on_behalf_of and member.type == MemberType.BRIDGE:
user_result = await db.execute(
select(Member).where(Member.slug == on_behalf_of)
select(Member).where(Member.id == uuid.UUID(on_behalf_of))
.options(selectinload(Member.agent_config))
)
user_member = user_result.scalar_one_or_none()
if user_member:
effective_slug = user_member.slug
effective_type = user_member.type
member = user_member
logger.info("Bridge acting on behalf of %s", effective_slug)
effective_member = user_member
logger.info("Bridge acting on behalf of %s (id=%s)", effective_member.slug, str(effective_member.id)[:8])
else:
effective_slug = f"web-{on_behalf_of}"
logger.info("Bridge acting on behalf of unknown user → %s", effective_slug)
logger.warning("Bridge on_behalf_of member not found: %s", on_behalf_of)
# Listen modes
chat_listen = ListenMode.ALL
task_listen = ListenMode.ALL
if member.agent_config:
chat_listen = member.agent_config.chat_listen
task_listen = member.agent_config.task_listen
if effective_member.agent_config:
chat_listen = effective_member.agent_config.chat_listen
task_listen = effective_member.agent_config.task_listen
# Register connection with unique session_id
# Register
session_id = str(uuid.uuid4())
client = ConnectedClient(
ws=ws,
session_id=session_id,
member_id=str(member.id),
member_slug=effective_slug,
member_type=effective_type,
member_id=str(effective_member.id),
member_slug=effective_member.slug,
member_type=effective_member.type,
chat_listen=chat_listen,
task_listen=task_listen,
)
await manager.connect(client)
# Update status
member.status = MemberStatus.ONLINE
effective_member.status = MemberStatus.ONLINE
await db.commit()
# Get lobby chat + projects
lobby = await db.execute(select(Chat).where(Chat.kind == ChatKind.LOBBY))
lobby_chat = lobby.scalar_one_or_none()
# Filter projects by membership (show all for owners)
if member.role == MemberRole.OWNER:
# Owners see all projects
if effective_member.role == MemberRole.OWNER:
projects = await db.execute(select(Project).where(Project.status == ProjectStatus.ACTIVE))
else:
# Members see only projects they belong to
projects = await db.execute(
select(Project)
.join(ProjectMember, Project.id == ProjectMember.project_id)
.where(Project.status == ProjectStatus.ACTIVE, ProjectMember.member_id == member.id)
.where(Project.status == ProjectStatus.ACTIVE, ProjectMember.member_id == effective_member.id)
)
project_list = []
@ -244,31 +219,41 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No
"chat_id": str(chat.id) if chat else None,
})
# Auto-subscribe to all member's projects
# Auto-subscribe
for p in project_list:
client.subscribed_projects.add(p["id"])
# Build online list as [{id, slug}]
online_list = []
seen = set()
for s in manager.sessions.values():
if s.member_id not in seen:
seen.add(s.member_id)
online_list.append({"id": s.member_id, "slug": s.member_slug})
await ws.send_json({
"type": WSEventType.AUTH_OK,
"data": {
"slug": effective_slug,
"member_id": str(effective_member.id),
"slug": effective_member.slug,
"lobby_chat_id": str(lobby_chat.id) if lobby_chat else None,
"projects": project_list,
"online": manager.online_slugs,
"online": online_list,
},
})
# Notify others
await manager.broadcast_all(
{"type": WSEventType.AGENT_STATUS, "data": {"slug": effective_slug, "status": MemberStatus.ONLINE}},
exclude_slug=effective_slug,
{"type": WSEventType.AGENT_STATUS, "data": {
"id": str(effective_member.id), "slug": effective_member.slug, "status": MemberStatus.ONLINE,
}},
exclude_member_id=str(effective_member.id),
)
return session_id
async def _handle_heartbeat(session_id: str, data: dict):
"""Update heartbeat timestamp."""
from datetime import datetime, timezone
client = manager.sessions.get(session_id)
@ -279,25 +264,25 @@ async def _handle_heartbeat(session_id: str, data: dict):
client.last_heartbeat = datetime.now(timezone.utc)
async with async_session() as db:
result = await db.execute(select(Member).where(Member.slug == client.member_slug))
result = await db.execute(select(Member).where(Member.id == uuid.UUID(client.member_id)))
member = result.scalar_one_or_none()
if member:
member.status = status
await db.commit()
await manager.broadcast_all(
{"type": WSEventType.AGENT_STATUS, "data": {"slug": client.member_slug, "status": status}},
exclude_slug=client.member_slug,
{"type": WSEventType.AGENT_STATUS, "data": {
"id": client.member_id, "slug": client.member_slug, "status": status,
}},
exclude_member_id=client.member_id,
)
async def _handle_chat_send(session_id: str, data: dict):
"""Handle chat message sent via WS."""
client = manager.sessions.get(session_id)
if not client:
return
slug = client.member_slug
chat_id = data.get("chat_id")
task_id = data.get("task_id")
content = data.get("content", "")
@ -308,7 +293,7 @@ async def _handle_chat_send(session_id: str, data: dict):
return
async with async_session() as db:
result = await db.execute(select(Member).where(Member.slug == slug, Member.is_active == True))
result = await db.execute(select(Member).where(Member.id == uuid.UUID(client.member_id), Member.is_active == True))
member = result.scalar_one_or_none()
if not member:
await client.ws.send_json({"type": WSEventType.ERROR, "message": "Member not found or inactive"})
@ -329,7 +314,6 @@ async def _handle_chat_send(session_id: str, data: dict):
msg_data = _to_message_out(msg, member).model_dump()
# Determine project_id for filtering
project_id = None
if chat_id:
chat_result = await db.execute(select(Chat).where(Chat.id == uuid.UUID(chat_id)))
@ -339,33 +323,33 @@ async def _handle_chat_send(session_id: str, data: dict):
elif chat and chat.kind == ChatKind.LOBBY:
await manager.broadcast_all(
{"type": WSEventType.MESSAGE_NEW, "data": msg_data},
exclude_slug=slug,
exclude_member_id=client.member_id,
)
return
if project_id:
await manager.broadcast_message(project_id, msg_data, author_slug=slug, author_session_id=session_id)
await manager.broadcast_message(
project_id, msg_data,
author_id=client.member_id,
author_session_id=session_id,
)
else:
await manager.broadcast_all(
{"type": WSEventType.MESSAGE_NEW, "data": msg_data},
exclude_slug=slug,
exclude_member_id=client.member_id,
)
async def _handle_agent_stream(session_id: str, event_type: str, data: dict):
"""Relay agent streaming events to project subscribers."""
logger.info("STREAM event: %s data=%s", event_type, str(data)[:200])
client = manager.sessions.get(session_id)
if not client:
logger.warning("STREAM: no client for session %s", session_id[:8])
return
# Only agents can stream
if client.member_type != MemberType.AGENT:
logger.warning("STREAM: non-agent %s tried to stream", client.member_slug)
return
# Determine project_id from chat_id or task_id in the data
project_id = data.get("project_id")
chat_id = data.get("chat_id")
task_id = data.get("task_id")
@ -388,18 +372,23 @@ async def _handle_agent_stream(session_id: str, event_type: str, data: dict):
"type": event_type,
"data": {
**data,
"agent_slug": client.member_slug,
"agent_id": client.member_id,
"agent_slug": client.member_slug, # kept for display convenience
},
}
if project_id:
await manager.broadcast_message(project_id, payload["data"], author_slug=client.member_slug, author_session_id=session_id, event_type=event_type)
await manager.broadcast_message(
project_id, payload["data"],
author_id=client.member_id,
author_session_id=session_id,
event_type=event_type,
)
else:
await manager.broadcast_all(payload, exclude_slug=client.member_slug)
await manager.broadcast_all(payload, exclude_member_id=client.member_id)
async def _handle_subscribe(session_id: str, data: dict):
"""Subscribe this session to project events."""
project_id = data.get("project_id")
if not project_id:
return
@ -410,7 +399,6 @@ async def _handle_subscribe(session_id: str, data: dict):
async def _handle_unsubscribe(session_id: str, data: dict):
"""Unsubscribe this session from project events."""
project_id = data.get("project_id")
if not project_id:
return

View File

@ -16,8 +16,8 @@ logger = logging.getLogger("tracker.ws")
class ConnectedClient:
ws: WebSocket
session_id: str # unique per connection
member_id: str | None = None # UUID as string
member_slug: str = ""
member_id: str = "" # UUID as string — primary identifier
member_slug: str = "" # for display/logging only
member_type: str = "" # human | agent | bridge
chat_listen: str = ListenMode.ALL
task_listen: str = ListenMode.ALL
@ -29,38 +29,39 @@ class ConnectionManager:
def __init__(self):
# session_id → client (one entry per WS connection)
self.sessions: dict[str, ConnectedClient] = {}
# slug → set of session_ids (for quick lookup)
self.slug_sessions: dict[str, set[str]] = {}
# member_id (UUID) → set of session_ids (for quick lookup)
self.member_sessions: dict[str, set[str]] = {}
async def connect(self, client: ConnectedClient):
self.sessions[client.session_id] = client
if client.member_slug not in self.slug_sessions:
self.slug_sessions[client.member_slug] = set()
self.slug_sessions[client.member_slug].add(client.session_id)
logger.info("WS connected: %s session=%s (%s)", client.member_slug, client.session_id[:8], client.member_type)
if client.member_id not in self.member_sessions:
self.member_sessions[client.member_id] = set()
self.member_sessions[client.member_id].add(client.session_id)
logger.info("WS connected: %s (id=%s) session=%s (%s)",
client.member_slug, client.member_id[:8], client.session_id[:8], client.member_type)
async def disconnect(self, session_id: str):
client = self.sessions.pop(session_id, None)
if client:
slug_set = self.slug_sessions.get(client.member_slug)
if slug_set:
slug_set.discard(session_id)
if not slug_set:
del self.slug_sessions[client.member_slug]
id_set = self.member_sessions.get(client.member_id)
if id_set:
id_set.discard(session_id)
if not id_set:
del self.member_sessions[client.member_id]
logger.info("WS disconnected: %s session=%s", client.member_slug, session_id[:8])
return client
def get_sessions_for_slug(self, slug: str) -> list[ConnectedClient]:
"""Get all active sessions for a member slug."""
session_ids = self.slug_sessions.get(slug, set())
def get_sessions_for_member(self, member_id: str) -> list[ConnectedClient]:
"""Get all active sessions for a member by UUID."""
session_ids = self.member_sessions.get(member_id, set())
return [self.sessions[sid] for sid in session_ids if sid in self.sessions]
def is_online(self, slug: str) -> bool:
return bool(self.slug_sessions.get(slug))
def is_online(self, member_id: str) -> bool:
return bool(self.member_sessions.get(member_id))
@property
def online_slugs(self) -> list[str]:
return list(self.slug_sessions.keys())
def online_member_ids(self) -> list[str]:
return list(self.member_sessions.keys())
async def send_to_session(self, session_id: str, data: dict):
"""Send to a specific session."""
@ -71,61 +72,77 @@ class ConnectionManager:
except Exception:
await self.disconnect(session_id)
async def send_to_slug(self, slug: str, data: dict):
"""Send to ALL sessions of a member."""
for client in self.get_sessions_for_slug(slug):
async def send_to_member(self, member_id: str, data: dict):
"""Send to ALL sessions of a member by UUID."""
for client in self.get_sessions_for_member(member_id):
try:
await client.ws.send_json(data)
except Exception:
await self.disconnect(client.session_id)
async def broadcast_message(self, project_id: str, message: dict, author_slug: str, author_session_id: str | None = None, event_type: str | None = None):
async def broadcast_message(self, project_id: str, message: dict,
author_id: str | None = None,
author_session_id: str | None = None,
event_type: str | None = None):
"""Broadcast message/stream event. Humans get everything, agents filtered.
Filtering for agents:
- Skip author's session (by session_id if available, else by slug)
- Skip author's session (by session_id if available, else by member_id)
- Skip if not subscribed to project
- Skip if chat_listen == "none"
- If chat_listen == "mentions": only if @slug in mentions
- System messages: only if @slug mentioned in content
- If chat_listen == "mentions": only if member_id in mention IDs
- System messages: only if member_id in mention IDs
"""
raw_mentions = message.get("mentions", [])
# mentions can be list of strings (slugs) or list of dicts (MemberBrief objects)
mentions: list[str] = []
# Extract member IDs from mentions (objects or strings)
mention_ids: set[str] = set()
mention_slugs: set[str] = set()
for m in raw_mentions:
if isinstance(m, str):
mentions.append(m)
elif isinstance(m, dict):
mentions.append(m.get("slug", ""))
if isinstance(m, dict):
if m.get("id"):
mention_ids.add(m["id"])
if m.get("slug"):
mention_slugs.add(m["slug"])
elif isinstance(m, str):
mention_slugs.add(m)
content = message.get("content", "")
author_type = message.get("author_type", "")
payload = {"type": event_type or WSEventType.MESSAGE_NEW, "data": message}
for session_id, client in list(self.sessions.items()):
# Skip only the sending session, not all sessions of the same user
# Skip the sending session (or all sessions of author if no session_id)
if author_session_id and session_id == author_session_id:
continue
if not author_session_id and client.member_slug == author_slug:
if not author_session_id and author_id and client.member_id == author_id:
continue
# Humans/bridges get ALL messages
if client.member_type in (MemberType.HUMAN, MemberType.BRIDGE):
await self.send_to_session(session_id, payload)
continue
# Agents: subscription check
if project_id not in client.subscribed_projects:
continue
if client.chat_listen == ListenMode.NONE:
continue
# System messages: only if agent is mentioned (by slug in mentions array, or @slug in content for compat)
# System messages: only if agent is mentioned (by ID or slug for compat)
if author_type == AuthorType.SYSTEM:
mentioned_in_mentions = client.member_slug in mentions
mentioned_in_content = f"@{client.member_slug}" in content
if not mentioned_in_mentions and not mentioned_in_content:
mentioned = (
client.member_id in mention_ids or
client.member_slug in mention_slugs or
f"@{client.member_slug}" in content # text fallback for compat
)
if not mentioned:
continue
await self.send_to_session(session_id, payload)
continue
# Regular messages: chat_listen filter
if client.chat_listen == ListenMode.MENTIONS and client.member_slug not in mentions:
if client.chat_listen == ListenMode.MENTIONS:
if client.member_id not in mention_ids and client.member_slug not in mention_slugs:
continue
await self.send_to_session(session_id, payload)
@ -156,10 +173,10 @@ class ConnectionManager:
):
await self.send_to_session(session_id, payload)
async def broadcast_all(self, data: dict, exclude_slug: str | None = None):
async def broadcast_all(self, data: dict, exclude_member_id: str | None = None):
"""Broadcast to all connected sessions."""
for session_id, client in list(self.sessions.items()):
if client.member_slug == exclude_slug:
if exclude_member_id and client.member_id == exclude_member_id:
continue
await self.send_to_session(session_id, data)