feat: remove adapters, add chat API + WS rooms, capabilities on agents
All checks were successful
Deploy Tracker / deploy (push) Successful in 7s

This commit is contained in:
Markov 2026-02-15 23:07:10 +01:00
parent 5f9ea80621
commit b777623320
8 changed files with 324 additions and 87 deletions

View File

@ -0,0 +1,64 @@
"""remove adapters, update agents: capabilities + token on agent
Revision ID: a1b2c3d4e5f6
Revises: 864b0ce5d657
Create Date: 2026-02-15 22:30:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = 'a1b2c3d4e5f6'
down_revision: Union[str, None] = '864b0ce5d657'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add new columns to agents
op.add_column('agents', sa.Column('token', sa.String(255), unique=True))
op.add_column('agents', sa.Column('capabilities', postgresql.ARRAY(sa.String()), server_default='{}', nullable=False))
# Generate tokens for existing agents
op.execute("UPDATE agents SET token = 'agent-' || md5(random()::text || clock_timestamp()::text) WHERE token IS NULL")
op.alter_column('agents', 'token', nullable=False)
# Drop old columns from agents
op.drop_constraint('agents_adapter_id_fkey', 'agents', type_='foreignkey')
op.drop_column('agents', 'adapter_id')
op.drop_column('agents', 'system_prompt')
op.drop_column('agents', 'host')
op.drop_column('agents', 'pid')
op.drop_column('agents', 'restart_count')
# Change default subscription_mode
op.alter_column('agents', 'subscription_mode', server_default='assigned')
# Drop adapters table
op.drop_table('adapters')
def downgrade() -> None:
# Recreate adapters
op.create_table(
'adapters',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')),
sa.Column('name', sa.String(255), nullable=False),
sa.Column('provider', sa.String(50), nullable=False),
sa.Column('config', postgresql.JSONB, nullable=False, server_default='{}'),
sa.Column('capabilities', postgresql.ARRAY(sa.String()), nullable=False, server_default='{}'),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()')),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()')),
)
# Re-add columns to agents
op.add_column('agents', sa.Column('adapter_id', postgresql.UUID(as_uuid=True)))
op.add_column('agents', sa.Column('system_prompt', sa.Text()))
op.add_column('agents', sa.Column('host', sa.String(255), server_default='localhost'))
op.add_column('agents', sa.Column('pid', sa.Integer()))
op.add_column('agents', sa.Column('restart_count', sa.Integer(), server_default='0'))
op.drop_column('agents', 'token')
op.drop_column('agents', 'capabilities')

View File

@ -1,5 +1,6 @@
"""Agents and Adapters CRUD API.""" """Agents CRUD API."""
import secrets
import uuid import uuid
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
@ -8,59 +9,23 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from tracker.database import get_db from tracker.database import get_db
from tracker.models import Adapter, Agent from tracker.models import Agent
router = APIRouter(prefix="/agents", tags=["agents"]) router = APIRouter(prefix="/agents", tags=["agents"])
# --- Adapters ---
class AdapterCreate(BaseModel):
name: str
provider: str
config: dict = {}
capabilities: list[str] = []
class AdapterOut(BaseModel):
id: uuid.UUID
name: str
provider: str
capabilities: list[str]
model_config = {"from_attributes": True}
@router.get("/adapters", response_model=list[AdapterOut])
async def list_adapters(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Adapter).order_by(Adapter.created_at))
return result.scalars().all()
@router.post("/adapters", response_model=AdapterOut, status_code=201)
async def create_adapter(data: AdapterCreate, db: AsyncSession = Depends(get_db)):
adapter = Adapter(**data.model_dump())
db.add(adapter)
await db.commit()
await db.refresh(adapter)
return adapter
# --- Agents ---
class AgentCreate(BaseModel): class AgentCreate(BaseModel):
name: str name: str
slug: str slug: str
adapter_id: uuid.UUID capabilities: list[str] = []
system_prompt: str | None = None subscription_mode: str = "assigned"
subscription_mode: str = "mentions"
max_concurrent: int = 1 max_concurrent: int = 1
timeout_seconds: int = 600 timeout_seconds: int = 600
class AgentUpdate(BaseModel): class AgentUpdate(BaseModel):
name: str | None = None name: str | None = None
system_prompt: str | None = None capabilities: list[str] | None = None
subscription_mode: str | None = None subscription_mode: str | None = None
max_concurrent: int | None = None max_concurrent: int | None = None
timeout_seconds: int | None = None timeout_seconds: int | None = None
@ -71,14 +36,12 @@ class AgentOut(BaseModel):
id: uuid.UUID id: uuid.UUID
name: str name: str
slug: str slug: str
adapter_id: uuid.UUID token: str
capabilities: list[str]
subscription_mode: str subscription_mode: str
max_concurrent: int max_concurrent: int
timeout_seconds: int timeout_seconds: int
status: str status: str
host: str | None
pid: int | None
restart_count: int
model_config = {"from_attributes": True} model_config = {"from_attributes": True}
@ -91,7 +54,9 @@ async def list_agents(db: AsyncSession = Depends(get_db)):
@router.post("/", response_model=AgentOut, status_code=201) @router.post("/", response_model=AgentOut, status_code=201)
async def create_agent(data: AgentCreate, db: AsyncSession = Depends(get_db)): async def create_agent(data: AgentCreate, db: AsyncSession = Depends(get_db)):
agent = Agent(**data.model_dump()) dump = data.model_dump()
dump["token"] = f"agent-{secrets.token_hex(16)}"
agent = Agent(**dump)
db.add(agent) db.add(agent)
await db.commit() await db.commit()
await db.refresh(agent) await db.refresh(agent)
@ -116,3 +81,12 @@ async def update_agent(agent_id: uuid.UUID, data: AgentUpdate, db: AsyncSession
await db.commit() await db.commit()
await db.refresh(agent) await db.refresh(agent)
return agent return agent
@router.delete("/{agent_id}", status_code=204)
async def delete_agent(agent_id: uuid.UUID, db: AsyncSession = Depends(get_db)):
agent = await db.get(Agent, agent_id)
if not agent:
raise HTTPException(404, "Agent not found")
await db.delete(agent)
await db.commit()

113
src/tracker/api/chats.py Normal file
View File

@ -0,0 +1,113 @@
"""Chats and messages API."""
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from tracker.database import get_db
from tracker.models import Chat, ChatMessage, Project
router = APIRouter(prefix="/chats", tags=["chats"])
class ChatOut(BaseModel):
id: uuid.UUID
project_id: uuid.UUID | None
task_id: uuid.UUID | None
kind: str
model_config = {"from_attributes": True}
class MessageCreate(BaseModel):
content: str
sender_type: str = "human" # human, agent, system
sender_id: uuid.UUID | None = None
sender_name: str | None = None
class MessageOut(BaseModel):
id: uuid.UUID
chat_id: uuid.UUID
sender_type: str
sender_id: uuid.UUID | None
sender_name: str | None
content: str
created_at: str
model_config = {"from_attributes": True}
@router.get("/lobby", response_model=ChatOut)
async def get_or_create_lobby(db: AsyncSession = Depends(get_db)):
"""Get or create the global lobby chat."""
result = await db.execute(select(Chat).where(Chat.kind == "lobby"))
chat = result.scalar_one_or_none()
if not chat:
chat = Chat(kind="lobby")
db.add(chat)
await db.commit()
await db.refresh(chat)
return chat
@router.get("/project/{project_id}", response_model=ChatOut)
async def get_or_create_project_chat(project_id: uuid.UUID, db: AsyncSession = Depends(get_db)):
"""Get or create a project chat."""
project = await db.get(Project, project_id)
if not project:
raise HTTPException(404, "Project not found")
result = await db.execute(
select(Chat).where(Chat.project_id == project_id, Chat.kind == "project")
)
chat = result.scalar_one_or_none()
if not chat:
chat = Chat(project_id=project_id, kind="project")
db.add(chat)
await db.commit()
await db.refresh(chat)
return chat
@router.get("/{chat_id}/messages", response_model=list[MessageOut])
async def list_messages(
chat_id: uuid.UUID,
limit: int = Query(50, ge=1, le=200),
before: uuid.UUID | None = None,
db: AsyncSession = Depends(get_db),
):
"""Get messages for a chat (newest first, paginated)."""
chat = await db.get(Chat, chat_id)
if not chat:
raise HTTPException(404, "Chat not found")
q = select(ChatMessage).where(ChatMessage.chat_id == chat_id)
if before:
ref = await db.get(ChatMessage, before)
if ref:
q = q.where(ChatMessage.created_at < ref.created_at)
q = q.order_by(ChatMessage.created_at.desc()).limit(limit)
result = await db.execute(q)
messages = list(result.scalars().all())
messages.reverse() # return oldest-first
return messages
@router.post("/{chat_id}/messages", response_model=MessageOut, status_code=201)
async def create_message(chat_id: uuid.UUID, data: MessageCreate, db: AsyncSession = Depends(get_db)):
"""Post a message to a chat (REST fallback — prefer WebSocket)."""
chat = await db.get(Chat, chat_id)
if not chat:
raise HTTPException(404, "Chat not found")
msg = ChatMessage(chat_id=chat_id, **data.model_dump())
db.add(msg)
await db.commit()
await db.refresh(msg)
return msg

View File

@ -8,7 +8,7 @@ from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from tracker.api import agents, labels, projects, tasks from tracker.api import agents, chats, labels, projects, tasks
from tracker.config import settings from tracker.config import settings
from tracker.ws.handler import router as ws_router from tracker.ws.handler import router as ws_router
@ -61,6 +61,7 @@ app.include_router(projects.router, prefix="/api/v1")
app.include_router(tasks.router, prefix="/api/v1") app.include_router(tasks.router, prefix="/api/v1")
app.include_router(agents.router, prefix="/api/v1") app.include_router(agents.router, prefix="/api/v1")
app.include_router(labels.router, prefix="/api/v1") app.include_router(labels.router, prefix="/api/v1")
app.include_router(chats.router, prefix="/api/v1")
# WebSocket # WebSocket
app.include_router(ws_router) app.include_router(ws_router)

View File

@ -1,7 +1,7 @@
from tracker.models.base import Base from tracker.models.base import Base
from tracker.models.project import Project from tracker.models.project import Project
from tracker.models.task import Task, TaskDependency, TaskFile, TaskLabel from tracker.models.task import Task, TaskDependency, TaskFile, TaskLabel
from tracker.models.agent import Agent, Adapter from tracker.models.agent import Agent
from tracker.models.label import Label from tracker.models.label import Label
from tracker.models.chat import Chat, ChatMessage from tracker.models.chat import Chat, ChatMessage
@ -13,7 +13,6 @@ __all__ = [
"TaskFile", "TaskFile",
"TaskLabel", "TaskLabel",
"Agent", "Agent",
"Adapter",
"Label", "Label",
"Chat", "Chat",
"ChatMessage", "ChatMessage",

View File

@ -1,42 +1,22 @@
"""Agent and Adapter models.""" """Agent model."""
import uuid import uuid
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Integer, String, Text from sqlalchemy import Integer, String, Text
from sqlalchemy.dialects.postgresql import ARRAY, JSONB, UUID from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column
from tracker.models.base import Base from tracker.models.base import Base
if TYPE_CHECKING:
pass
class Adapter(Base):
__tablename__ = "adapters"
name: Mapped[str] = mapped_column(String(255), nullable=False)
provider: Mapped[str] = mapped_column(String(50), nullable=False) # anthropic, openai, openclaw, cli, ...
config: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
capabilities: Mapped[list[str]] = mapped_column(ARRAY(String), nullable=False, default=list)
agents: Mapped[list["Agent"]] = relationship(back_populates="adapter")
class Agent(Base): class Agent(Base):
__tablename__ = "agents" __tablename__ = "agents"
name: Mapped[str] = mapped_column(String(255), nullable=False) name: Mapped[str] = mapped_column(String(255), nullable=False)
slug: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) slug: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
adapter_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("adapters.id"), nullable=False) token: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) # for WS auth
system_prompt: Mapped[str | None] = mapped_column(Text) capabilities: Mapped[list[str]] = mapped_column(ARRAY(String), nullable=False, default=list)
subscription_mode: Mapped[str] = mapped_column(String(20), default="mentions") # all, mentions, assigned subscription_mode: Mapped[str] = mapped_column(String(20), default="assigned") # all, mentions, assigned
max_concurrent: Mapped[int] = mapped_column(Integer, default=1) max_concurrent: Mapped[int] = mapped_column(Integer, default=1)
timeout_seconds: Mapped[int] = mapped_column(Integer, default=600) timeout_seconds: Mapped[int] = mapped_column(Integer, default=600)
status: Mapped[str] = mapped_column(String(20), default="offline") # online, offline, busy, dead status: Mapped[str] = mapped_column(String(20), default="offline") # online, offline, busy
host: Mapped[str | None] = mapped_column(String(255), default="localhost")
pid: Mapped[int | None] = mapped_column(Integer)
restart_count: Mapped[int] = mapped_column(Integer, default=0)
adapter: Mapped["Adapter"] = relationship(back_populates="agents")

View File

@ -5,6 +5,8 @@ import logging
from fastapi import APIRouter, WebSocket, WebSocketDisconnect from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from tracker.database import async_session
from tracker.models import Chat, ChatMessage
from tracker.ws.manager import manager from tracker.ws.manager import manager
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -25,18 +27,77 @@ async def websocket_endpoint(ws: WebSocket, client_type: str = "human", client_i
continue continue
event = msg.get("event") event = msg.get("event")
if event == "agent.heartbeat": if event == "agent.heartbeat":
await manager.send(ws_id, "agent.heartbeat.ack", {}) await manager.send(ws_id, "agent.heartbeat.ack", {})
elif event == "chat.send": elif event == "chat.send":
# Broadcast to all for now; will scope to project/task later await _handle_chat_send(ws_id, msg)
await manager.broadcast("chat.message", {
"sender": ws_id, elif event == "chat.subscribe":
"content": msg.get("content", ""), # Subscribe to a chat room
"chat_id": msg.get("chat_id"), chat_id = msg.get("chat_id")
}, exclude=ws_id) if chat_id:
await manager.subscribe(ws_id, f"chat:{chat_id}")
await manager.send(ws_id, "chat.subscribed", {"chat_id": chat_id})
elif event == "chat.unsubscribe":
chat_id = msg.get("chat_id")
if chat_id:
await manager.unsubscribe(ws_id, f"chat:{chat_id}")
else: else:
logger.debug("Unknown WS event: %s", event) logger.debug("Unknown WS event: %s", event)
except WebSocketDisconnect: except WebSocketDisconnect:
pass pass
finally: finally:
await manager.disconnect(ws_id) await manager.disconnect(ws_id)
async def _handle_chat_send(ws_id: str, msg: dict):
"""Save message to DB and broadcast to chat subscribers."""
chat_id = msg.get("chat_id")
content = msg.get("content", "").strip()
sender_type = msg.get("sender_type", "human")
sender_id = msg.get("sender_id")
sender_name = msg.get("sender_name")
if not chat_id or not content:
await manager.send(ws_id, "error", {"message": "chat_id and content required"})
return
# Save to DB
try:
async with async_session() as db:
chat = await db.get(Chat, chat_id)
if not chat:
await manager.send(ws_id, "error", {"message": "Chat not found"})
return
db_msg = ChatMessage(
chat_id=chat_id,
sender_type=sender_type,
sender_id=sender_id,
sender_name=sender_name,
content=content,
)
db.add(db_msg)
await db.commit()
await db.refresh(db_msg)
# Broadcast to all subscribers of this chat
payload = {
"id": str(db_msg.id),
"chat_id": str(chat_id),
"sender_type": sender_type,
"sender_id": str(sender_id) if sender_id else None,
"sender_name": sender_name,
"content": content,
"created_at": str(db_msg.created_at),
}
await manager.broadcast_to_room(f"chat:{chat_id}", "chat.message", payload)
except Exception as e:
logger.error("Error saving chat message: %s", e)
await manager.send(ws_id, "error", {"message": "Failed to save message"})

View File

@ -1,4 +1,4 @@
"""WebSocket connection manager.""" """WebSocket connection manager with room subscriptions."""
import asyncio import asyncio
import json import json
@ -15,12 +15,13 @@ class Connection:
ws: WebSocket ws: WebSocket
client_type: str = "unknown" # human, agent client_type: str = "unknown" # human, agent
client_id: str | None = None client_id: str | None = None
subscriptions: set[str] = field(default_factory=set) # project slugs rooms: set[str] = field(default_factory=set) # subscribed rooms: "chat:<id>"
class ConnectionManager: class ConnectionManager:
def __init__(self): def __init__(self):
self._connections: dict[str, Connection] = {} # ws id -> Connection self._connections: dict[str, Connection] = {}
self._rooms: dict[str, set[str]] = {} # room -> set of ws_ids
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
async def connect(self, ws: WebSocket, client_type: str = "unknown", client_id: str | None = None) -> str: async def connect(self, ws: WebSocket, client_type: str = "unknown", client_id: str | None = None) -> str:
@ -34,9 +35,34 @@ class ConnectionManager:
async def disconnect(self, ws_id: str): async def disconnect(self, ws_id: str):
async with self._lock: async with self._lock:
self._connections.pop(ws_id, None) conn = self._connections.pop(ws_id, None)
if conn:
for room in conn.rooms:
s = self._rooms.get(room)
if s:
s.discard(ws_id)
if not s:
del self._rooms[room]
logger.info("WS disconnected: %s", ws_id) logger.info("WS disconnected: %s", ws_id)
async def subscribe(self, ws_id: str, room: str):
async with self._lock:
conn = self._connections.get(ws_id)
if conn:
conn.rooms.add(room)
self._rooms.setdefault(room, set()).add(ws_id)
async def unsubscribe(self, ws_id: str, room: str):
async with self._lock:
conn = self._connections.get(ws_id)
if conn:
conn.rooms.discard(room)
s = self._rooms.get(room)
if s:
s.discard(ws_id)
if not s:
del self._rooms[room]
async def send(self, ws_id: str, event: str, data: dict): async def send(self, ws_id: str, event: str, data: dict):
conn = self._connections.get(ws_id) conn = self._connections.get(ws_id)
if conn: if conn:
@ -58,6 +84,25 @@ class ConnectionManager:
for ws_id in dead: for ws_id in dead:
await self.disconnect(ws_id) await self.disconnect(ws_id)
async def broadcast_to_room(self, room: str, event: str, data: dict, exclude: str | None = None):
"""Send to all connections subscribed to a room."""
ws_ids = self._rooms.get(room, set())
msg = {"event": event, **data}
dead = []
for ws_id in list(ws_ids):
if ws_id == exclude:
continue
conn = self._connections.get(ws_id)
if not conn:
dead.append(ws_id)
continue
try:
await conn.ws.send_json(msg)
except Exception:
dead.append(ws_id)
for ws_id in dead:
await self.disconnect(ws_id)
@property @property
def active_count(self) -> int: def active_count(self) -> int:
return len(self._connections) return len(self._connections)