diff --git a/alembic/versions/a1b2c3d4e5f6_remove_adapters_update_agents.py b/alembic/versions/a1b2c3d4e5f6_remove_adapters_update_agents.py new file mode 100644 index 0000000..547e955 --- /dev/null +++ b/alembic/versions/a1b2c3d4e5f6_remove_adapters_update_agents.py @@ -0,0 +1,64 @@ +"""remove adapters, update agents: capabilities + token on agent + +Revision ID: a1b2c3d4e5f6 +Revises: 864b0ce5d657 +Create Date: 2026-02-15 22:30:00.000000 +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision: str = 'a1b2c3d4e5f6' +down_revision: Union[str, None] = '864b0ce5d657' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add new columns to agents + op.add_column('agents', sa.Column('token', sa.String(255), unique=True)) + op.add_column('agents', sa.Column('capabilities', postgresql.ARRAY(sa.String()), server_default='{}', nullable=False)) + + # Generate tokens for existing agents + op.execute("UPDATE agents SET token = 'agent-' || md5(random()::text || clock_timestamp()::text) WHERE token IS NULL") + op.alter_column('agents', 'token', nullable=False) + + # Drop old columns from agents + op.drop_constraint('agents_adapter_id_fkey', 'agents', type_='foreignkey') + op.drop_column('agents', 'adapter_id') + op.drop_column('agents', 'system_prompt') + op.drop_column('agents', 'host') + op.drop_column('agents', 'pid') + op.drop_column('agents', 'restart_count') + + # Change default subscription_mode + op.alter_column('agents', 'subscription_mode', server_default='assigned') + + # Drop adapters table + op.drop_table('adapters') + + +def downgrade() -> None: + # Recreate adapters + op.create_table( + 'adapters', + sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('name', sa.String(255), nullable=False), + sa.Column('provider', sa.String(50), nullable=False), + sa.Column('config', postgresql.JSONB, nullable=False, server_default='{}'), + sa.Column('capabilities', postgresql.ARRAY(sa.String()), nullable=False, server_default='{}'), + sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()')), + sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()')), + ) + + # Re-add columns to agents + op.add_column('agents', sa.Column('adapter_id', postgresql.UUID(as_uuid=True))) + op.add_column('agents', sa.Column('system_prompt', sa.Text())) + op.add_column('agents', sa.Column('host', sa.String(255), server_default='localhost')) + op.add_column('agents', sa.Column('pid', sa.Integer())) + op.add_column('agents', sa.Column('restart_count', sa.Integer(), server_default='0')) + + op.drop_column('agents', 'token') + op.drop_column('agents', 'capabilities') diff --git a/src/tracker/api/agents.py b/src/tracker/api/agents.py index ebcc4c8..b068f1d 100644 --- a/src/tracker/api/agents.py +++ b/src/tracker/api/agents.py @@ -1,5 +1,6 @@ -"""Agents and Adapters CRUD API.""" +"""Agents CRUD API.""" +import secrets import uuid from fastapi import APIRouter, Depends, HTTPException @@ -8,59 +9,23 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from tracker.database import get_db -from tracker.models import Adapter, Agent +from tracker.models import Agent router = APIRouter(prefix="/agents", tags=["agents"]) -# --- Adapters --- - -class AdapterCreate(BaseModel): - name: str - provider: str - config: dict = {} - capabilities: list[str] = [] - - -class AdapterOut(BaseModel): - id: uuid.UUID - name: str - provider: str - capabilities: list[str] - - model_config = {"from_attributes": True} - - -@router.get("/adapters", response_model=list[AdapterOut]) -async def list_adapters(db: AsyncSession = Depends(get_db)): - result = await db.execute(select(Adapter).order_by(Adapter.created_at)) - return result.scalars().all() - - -@router.post("/adapters", response_model=AdapterOut, status_code=201) -async def create_adapter(data: AdapterCreate, db: AsyncSession = Depends(get_db)): - adapter = Adapter(**data.model_dump()) - db.add(adapter) - await db.commit() - await db.refresh(adapter) - return adapter - - -# --- Agents --- - class AgentCreate(BaseModel): name: str slug: str - adapter_id: uuid.UUID - system_prompt: str | None = None - subscription_mode: str = "mentions" + capabilities: list[str] = [] + subscription_mode: str = "assigned" max_concurrent: int = 1 timeout_seconds: int = 600 class AgentUpdate(BaseModel): name: str | None = None - system_prompt: str | None = None + capabilities: list[str] | None = None subscription_mode: str | None = None max_concurrent: int | None = None timeout_seconds: int | None = None @@ -71,14 +36,12 @@ class AgentOut(BaseModel): id: uuid.UUID name: str slug: str - adapter_id: uuid.UUID + token: str + capabilities: list[str] subscription_mode: str max_concurrent: int timeout_seconds: int status: str - host: str | None - pid: int | None - restart_count: int model_config = {"from_attributes": True} @@ -91,7 +54,9 @@ async def list_agents(db: AsyncSession = Depends(get_db)): @router.post("/", response_model=AgentOut, status_code=201) async def create_agent(data: AgentCreate, db: AsyncSession = Depends(get_db)): - agent = Agent(**data.model_dump()) + dump = data.model_dump() + dump["token"] = f"agent-{secrets.token_hex(16)}" + agent = Agent(**dump) db.add(agent) await db.commit() await db.refresh(agent) @@ -116,3 +81,12 @@ async def update_agent(agent_id: uuid.UUID, data: AgentUpdate, db: AsyncSession await db.commit() await db.refresh(agent) return agent + + +@router.delete("/{agent_id}", status_code=204) +async def delete_agent(agent_id: uuid.UUID, db: AsyncSession = Depends(get_db)): + agent = await db.get(Agent, agent_id) + if not agent: + raise HTTPException(404, "Agent not found") + await db.delete(agent) + await db.commit() diff --git a/src/tracker/api/chats.py b/src/tracker/api/chats.py new file mode 100644 index 0000000..efc8204 --- /dev/null +++ b/src/tracker/api/chats.py @@ -0,0 +1,113 @@ +"""Chats and messages API.""" + +import uuid + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from tracker.database import get_db +from tracker.models import Chat, ChatMessage, Project + +router = APIRouter(prefix="/chats", tags=["chats"]) + + +class ChatOut(BaseModel): + id: uuid.UUID + project_id: uuid.UUID | None + task_id: uuid.UUID | None + kind: str + + model_config = {"from_attributes": True} + + +class MessageCreate(BaseModel): + content: str + sender_type: str = "human" # human, agent, system + sender_id: uuid.UUID | None = None + sender_name: str | None = None + + +class MessageOut(BaseModel): + id: uuid.UUID + chat_id: uuid.UUID + sender_type: str + sender_id: uuid.UUID | None + sender_name: str | None + content: str + created_at: str + + model_config = {"from_attributes": True} + + +@router.get("/lobby", response_model=ChatOut) +async def get_or_create_lobby(db: AsyncSession = Depends(get_db)): + """Get or create the global lobby chat.""" + result = await db.execute(select(Chat).where(Chat.kind == "lobby")) + chat = result.scalar_one_or_none() + if not chat: + chat = Chat(kind="lobby") + db.add(chat) + await db.commit() + await db.refresh(chat) + return chat + + +@router.get("/project/{project_id}", response_model=ChatOut) +async def get_or_create_project_chat(project_id: uuid.UUID, db: AsyncSession = Depends(get_db)): + """Get or create a project chat.""" + project = await db.get(Project, project_id) + if not project: + raise HTTPException(404, "Project not found") + + result = await db.execute( + select(Chat).where(Chat.project_id == project_id, Chat.kind == "project") + ) + chat = result.scalar_one_or_none() + if not chat: + chat = Chat(project_id=project_id, kind="project") + db.add(chat) + await db.commit() + await db.refresh(chat) + return chat + + +@router.get("/{chat_id}/messages", response_model=list[MessageOut]) +async def list_messages( + chat_id: uuid.UUID, + limit: int = Query(50, ge=1, le=200), + before: uuid.UUID | None = None, + db: AsyncSession = Depends(get_db), +): + """Get messages for a chat (newest first, paginated).""" + chat = await db.get(Chat, chat_id) + if not chat: + raise HTTPException(404, "Chat not found") + + q = select(ChatMessage).where(ChatMessage.chat_id == chat_id) + if before: + ref = await db.get(ChatMessage, before) + if ref: + q = q.where(ChatMessage.created_at < ref.created_at) + q = q.order_by(ChatMessage.created_at.desc()).limit(limit) + + result = await db.execute(q) + messages = list(result.scalars().all()) + messages.reverse() # return oldest-first + return messages + + +@router.post("/{chat_id}/messages", response_model=MessageOut, status_code=201) +async def create_message(chat_id: uuid.UUID, data: MessageCreate, db: AsyncSession = Depends(get_db)): + """Post a message to a chat (REST fallback — prefer WebSocket).""" + chat = await db.get(Chat, chat_id) + if not chat: + raise HTTPException(404, "Chat not found") + + msg = ChatMessage(chat_id=chat_id, **data.model_dump()) + db.add(msg) + await db.commit() + await db.refresh(msg) + return msg diff --git a/src/tracker/app.py b/src/tracker/app.py index 742f434..a89fd86 100644 --- a/src/tracker/app.py +++ b/src/tracker/app.py @@ -8,7 +8,7 @@ from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse -from tracker.api import agents, labels, projects, tasks +from tracker.api import agents, chats, labels, projects, tasks from tracker.config import settings from tracker.ws.handler import router as ws_router @@ -61,6 +61,7 @@ app.include_router(projects.router, prefix="/api/v1") app.include_router(tasks.router, prefix="/api/v1") app.include_router(agents.router, prefix="/api/v1") app.include_router(labels.router, prefix="/api/v1") +app.include_router(chats.router, prefix="/api/v1") # WebSocket app.include_router(ws_router) diff --git a/src/tracker/models/__init__.py b/src/tracker/models/__init__.py index 9fd4159..28126f7 100644 --- a/src/tracker/models/__init__.py +++ b/src/tracker/models/__init__.py @@ -1,7 +1,7 @@ from tracker.models.base import Base from tracker.models.project import Project from tracker.models.task import Task, TaskDependency, TaskFile, TaskLabel -from tracker.models.agent import Agent, Adapter +from tracker.models.agent import Agent from tracker.models.label import Label from tracker.models.chat import Chat, ChatMessage @@ -13,7 +13,6 @@ __all__ = [ "TaskFile", "TaskLabel", "Agent", - "Adapter", "Label", "Chat", "ChatMessage", diff --git a/src/tracker/models/agent.py b/src/tracker/models/agent.py index 775fb02..c933461 100644 --- a/src/tracker/models/agent.py +++ b/src/tracker/models/agent.py @@ -1,42 +1,22 @@ -"""Agent and Adapter models.""" +"""Agent model.""" import uuid -from typing import TYPE_CHECKING -from sqlalchemy import ForeignKey, Integer, String, Text -from sqlalchemy.dialects.postgresql import ARRAY, JSONB, UUID -from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy import Integer, String, Text +from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy.orm import Mapped, mapped_column from tracker.models.base import Base -if TYPE_CHECKING: - pass - - -class Adapter(Base): - __tablename__ = "adapters" - - name: Mapped[str] = mapped_column(String(255), nullable=False) - provider: Mapped[str] = mapped_column(String(50), nullable=False) # anthropic, openai, openclaw, cli, ... - config: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict) - capabilities: Mapped[list[str]] = mapped_column(ARRAY(String), nullable=False, default=list) - - agents: Mapped[list["Agent"]] = relationship(back_populates="adapter") - class Agent(Base): __tablename__ = "agents" name: Mapped[str] = mapped_column(String(255), nullable=False) slug: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) - adapter_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("adapters.id"), nullable=False) - system_prompt: Mapped[str | None] = mapped_column(Text) - subscription_mode: Mapped[str] = mapped_column(String(20), default="mentions") # all, mentions, assigned + token: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) # for WS auth + capabilities: Mapped[list[str]] = mapped_column(ARRAY(String), nullable=False, default=list) + subscription_mode: Mapped[str] = mapped_column(String(20), default="assigned") # all, mentions, assigned max_concurrent: Mapped[int] = mapped_column(Integer, default=1) timeout_seconds: Mapped[int] = mapped_column(Integer, default=600) - status: Mapped[str] = mapped_column(String(20), default="offline") # online, offline, busy, dead - host: Mapped[str | None] = mapped_column(String(255), default="localhost") - pid: Mapped[int | None] = mapped_column(Integer) - restart_count: Mapped[int] = mapped_column(Integer, default=0) - - adapter: Mapped["Adapter"] = relationship(back_populates="agents") + status: Mapped[str] = mapped_column(String(20), default="offline") # online, offline, busy diff --git a/src/tracker/ws/handler.py b/src/tracker/ws/handler.py index 78d5546..829d0f0 100644 --- a/src/tracker/ws/handler.py +++ b/src/tracker/ws/handler.py @@ -5,6 +5,8 @@ import logging from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from tracker.database import async_session +from tracker.models import Chat, ChatMessage from tracker.ws.manager import manager logger = logging.getLogger(__name__) @@ -25,18 +27,77 @@ async def websocket_endpoint(ws: WebSocket, client_type: str = "human", client_i continue event = msg.get("event") + if event == "agent.heartbeat": await manager.send(ws_id, "agent.heartbeat.ack", {}) + elif event == "chat.send": - # Broadcast to all for now; will scope to project/task later - await manager.broadcast("chat.message", { - "sender": ws_id, - "content": msg.get("content", ""), - "chat_id": msg.get("chat_id"), - }, exclude=ws_id) + await _handle_chat_send(ws_id, msg) + + elif event == "chat.subscribe": + # Subscribe to a chat room + chat_id = msg.get("chat_id") + if chat_id: + await manager.subscribe(ws_id, f"chat:{chat_id}") + await manager.send(ws_id, "chat.subscribed", {"chat_id": chat_id}) + + elif event == "chat.unsubscribe": + chat_id = msg.get("chat_id") + if chat_id: + await manager.unsubscribe(ws_id, f"chat:{chat_id}") + else: logger.debug("Unknown WS event: %s", event) + except WebSocketDisconnect: pass finally: await manager.disconnect(ws_id) + + +async def _handle_chat_send(ws_id: str, msg: dict): + """Save message to DB and broadcast to chat subscribers.""" + chat_id = msg.get("chat_id") + content = msg.get("content", "").strip() + sender_type = msg.get("sender_type", "human") + sender_id = msg.get("sender_id") + sender_name = msg.get("sender_name") + + if not chat_id or not content: + await manager.send(ws_id, "error", {"message": "chat_id and content required"}) + return + + # Save to DB + try: + async with async_session() as db: + chat = await db.get(Chat, chat_id) + if not chat: + await manager.send(ws_id, "error", {"message": "Chat not found"}) + return + + db_msg = ChatMessage( + chat_id=chat_id, + sender_type=sender_type, + sender_id=sender_id, + sender_name=sender_name, + content=content, + ) + db.add(db_msg) + await db.commit() + await db.refresh(db_msg) + + # Broadcast to all subscribers of this chat + payload = { + "id": str(db_msg.id), + "chat_id": str(chat_id), + "sender_type": sender_type, + "sender_id": str(sender_id) if sender_id else None, + "sender_name": sender_name, + "content": content, + "created_at": str(db_msg.created_at), + } + await manager.broadcast_to_room(f"chat:{chat_id}", "chat.message", payload) + + except Exception as e: + logger.error("Error saving chat message: %s", e) + await manager.send(ws_id, "error", {"message": "Failed to save message"}) diff --git a/src/tracker/ws/manager.py b/src/tracker/ws/manager.py index 88688c1..3b80f26 100644 --- a/src/tracker/ws/manager.py +++ b/src/tracker/ws/manager.py @@ -1,4 +1,4 @@ -"""WebSocket connection manager.""" +"""WebSocket connection manager with room subscriptions.""" import asyncio import json @@ -15,12 +15,13 @@ class Connection: ws: WebSocket client_type: str = "unknown" # human, agent client_id: str | None = None - subscriptions: set[str] = field(default_factory=set) # project slugs + rooms: set[str] = field(default_factory=set) # subscribed rooms: "chat:" class ConnectionManager: def __init__(self): - self._connections: dict[str, Connection] = {} # ws id -> Connection + self._connections: dict[str, Connection] = {} + self._rooms: dict[str, set[str]] = {} # room -> set of ws_ids self._lock = asyncio.Lock() async def connect(self, ws: WebSocket, client_type: str = "unknown", client_id: str | None = None) -> str: @@ -34,9 +35,34 @@ class ConnectionManager: async def disconnect(self, ws_id: str): async with self._lock: - self._connections.pop(ws_id, None) + conn = self._connections.pop(ws_id, None) + if conn: + for room in conn.rooms: + s = self._rooms.get(room) + if s: + s.discard(ws_id) + if not s: + del self._rooms[room] logger.info("WS disconnected: %s", ws_id) + async def subscribe(self, ws_id: str, room: str): + async with self._lock: + conn = self._connections.get(ws_id) + if conn: + conn.rooms.add(room) + self._rooms.setdefault(room, set()).add(ws_id) + + async def unsubscribe(self, ws_id: str, room: str): + async with self._lock: + conn = self._connections.get(ws_id) + if conn: + conn.rooms.discard(room) + s = self._rooms.get(room) + if s: + s.discard(ws_id) + if not s: + del self._rooms[room] + async def send(self, ws_id: str, event: str, data: dict): conn = self._connections.get(ws_id) if conn: @@ -58,6 +84,25 @@ class ConnectionManager: for ws_id in dead: await self.disconnect(ws_id) + async def broadcast_to_room(self, room: str, event: str, data: dict, exclude: str | None = None): + """Send to all connections subscribed to a room.""" + ws_ids = self._rooms.get(room, set()) + msg = {"event": event, **data} + dead = [] + for ws_id in list(ws_ids): + if ws_id == exclude: + continue + conn = self._connections.get(ws_id) + if not conn: + dead.append(ws_id) + continue + try: + await conn.ws.send_json(msg) + except Exception: + dead.append(ws_id) + for ws_id in dead: + await self.disconnect(ws_id) + @property def active_count(self) -> int: return len(self._connections)