feat: migrate messages to events
Some checks failed
Deploy Tracker / deploy (push) Failing after 3s

- Created Event + EventAttachment models to replace Chat + Message + TaskAction
- Added /api/v1/events API with unified event handling
- Migrated _system_message() in tasks.py to create Event instead of dual Message approach
- Updated ws/handler.py chat.send to create Event
- Updated init_db.py to remove Chat/lobby dependencies
- Added backward compatibility alias /api/v1/messages -> /api/v1/events
- Kept models/chat.py for legacy imports during transition
- WS protocol unchanged - clients still receive message.new, task.* events
This commit is contained in:
markov 2026-03-18 19:39:47 +01:00
parent 79bd087b65
commit 8b3a9b2148
13 changed files with 556 additions and 181 deletions

View File

@ -10,7 +10,8 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from ..database import get_db from ..database import get_db
from ..models import Attachment, Message, Member from ..models import Member
from ..models.chat import Attachment, Message # Legacy imports
from .schemas import UploadOut from .schemas import UploadOut
router = APIRouter(tags=["attachments"]) router = APIRouter(tags=["attachments"])

View File

@ -1,10 +1,12 @@
"""ORM → Pydantic converters. Single place for all model-to-schema transformations.""" """ORM → Pydantic converters. Single place for all model-to-schema transformations."""
from ..models import Attachment, Member, Message, ProjectFile, Step, Task from ..models import Event, EventAttachment, Member, ProjectFile, Step, Task
from ..models.chat import Message, Attachment # Legacy for backward compatibility
from ..enums import MemberType from ..enums import MemberType
from .schemas import ( from .schemas import (
AgentConfigOut, AgentConfigOut,
AttachmentOut, AttachmentOut,
EventOut,
MemberBrief, MemberBrief,
MemberOut, MemberOut,
MessageOut, MessageOut,
@ -56,7 +58,7 @@ def member_out(m: Member) -> MemberOut:
) )
def attachment_out(a: Attachment) -> AttachmentOut: def attachment_out(a: Attachment | EventAttachment) -> AttachmentOut:
return AttachmentOut( return AttachmentOut(
id=str(a.id), id=str(a.id),
filename=a.filename, filename=a.filename,
@ -146,6 +148,21 @@ def task_out(t: Task, project_slug: str = "") -> TaskOut:
) )
def event_out(e: Event) -> EventOut:
"""Convert Event model to EventOut schema."""
return EventOut(
id=str(e.id),
project_id=str(e.project_id),
task_id=str(e.task_id) if e.task_id else None,
parent_id=str(e.parent_id) if e.parent_id else None,
type=e.type,
actor=member_brief(e.actor) if e.actor else None,
payload=e.payload or {},
attachments=[attachment_out(a) for a in (e.attachments or [])],
created_at=e.created_at.isoformat() if e.created_at else "",
)
def project_file_out(f: ProjectFile) -> ProjectFileOut: def project_file_out(f: ProjectFile) -> ProjectFileOut:
return ProjectFileOut( return ProjectFileOut(
id=str(f.id), id=str(f.id),

270
src/tracker/api/events.py Normal file
View File

@ -0,0 +1,270 @@
"""Events API — unified events for chat messages, task comments, status changes."""
import os
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from ..database import get_db
from ..enums import AuthorType
from ..models import Event, EventAttachment, Task
from .schemas import EventOut
from .converters import event_out
router = APIRouter(tags=["events"])
UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/data/uploads")
# --- Schemas ---
class AttachmentInput(BaseModel):
"""Uploaded file info from /upload endpoint."""
file_id: str # UUID from upload
filename: str
mime_type: str | None = None
size: int = 0
storage_name: str # filename on disk
class EventCreate(BaseModel):
type: str # chat_message | task_comment | task_created | ...
project_id: str | None = None # Required for chat_message
task_id: str | None = None # Required for task_comment; if set, project_id resolved from task
parent_id: str | None = None
content: str | None = None # For chat_message, task_comment
mentions: list[str] = []
thinking: str | None = None
voice_url: str | None = None
tool_log: dict | None = None
payload: dict = {} # Event-specific data
attachments: list[AttachmentInput] = []
# --- Endpoints ---
@router.get("/events", response_model=list[EventOut])
async def list_events(
project_id: Optional[str] = Query(None),
task_id: Optional[str] = Query(None),
types: Optional[str] = Query(None), # comma-separated
parent_id: Optional[str] = Query(None),
limit: int = Query(50, le=200),
offset: int = Query(0),
db: AsyncSession = Depends(get_db),
):
"""Get events with filters.
Examples:
- Project chat: ?project_id=X&types=chat_message,task_created,task_status,task_assigned,task_unassigned
- Task comments: ?task_id=X
"""
q = select(Event).options(
selectinload(Event.attachments),
selectinload(Event.actor),
selectinload(Event.task),
selectinload(Event.project)
)
if project_id:
q = q.where(Event.project_id == uuid.UUID(project_id))
if task_id:
q = q.where(Event.task_id == uuid.UUID(task_id))
if types:
type_list = [t.strip() for t in types.split(",")]
q = q.where(Event.type.in_(type_list))
if parent_id:
q = q.where(Event.parent_id == uuid.UUID(parent_id))
# For top-level events only (no threads), if no parent_id filter
if not parent_id and (project_id or task_id):
q = q.where(Event.parent_id.is_(None))
# Get newest N events (DESC), then reverse to chronological order
q = q.order_by(Event.created_at.desc()).offset(offset).limit(limit)
result = await db.execute(q)
events = [event_out(e).model_dump() for e in result.scalars()]
events.reverse()
return events
@router.post("/events", response_model=EventOut)
async def create_event(req: EventCreate, request: Request, db: AsyncSession = Depends(get_db)):
"""Create event. Type-specific validation:
- chat_message: project_id required
- task_comment: task_id required, project_id resolved from task
"""
# Resolve author from auth — never trust client-provided author fields
member = getattr(request.state, "member", None)
if not member:
raise HTTPException(401, "Not authenticated")
# Validate type-specific requirements
project_id_uuid = None
task_id_uuid = None
if req.type == "chat_message":
if not req.project_id:
raise HTTPException(400, "project_id required for chat_message")
project_id_uuid = uuid.UUID(req.project_id)
elif req.type == "task_comment":
if not req.task_id:
raise HTTPException(400, "task_id required for task_comment")
task_id_uuid = uuid.UUID(req.task_id)
# Resolve project_id from task
task_result = await db.execute(select(Task).where(Task.id == task_id_uuid))
task = task_result.scalar_one_or_none()
if not task:
raise HTTPException(404, "Task not found")
project_id_uuid = task.project_id
else:
# Other event types (task_created, task_status, etc.)
if req.project_id:
project_id_uuid = uuid.UUID(req.project_id)
if req.task_id:
task_id_uuid = uuid.UUID(req.task_id)
if not project_id_uuid:
# Resolve from task
task_result = await db.execute(select(Task).where(Task.id == task_id_uuid))
task = task_result.scalar_one_or_none()
if task:
project_id_uuid = task.project_id
if not project_id_uuid:
raise HTTPException(400, "project_id must be provided or resolvable")
# Build payload
payload = req.payload.copy()
if req.type in ("chat_message", "task_comment"):
if req.content:
payload["content"] = req.content
if req.mentions:
payload["mentions"] = req.mentions
if req.thinking:
payload["thinking"] = req.thinking
if req.voice_url:
payload["voice_url"] = req.voice_url
if req.tool_log:
payload["tool_log"] = req.tool_log
event = Event(
project_id=project_id_uuid,
task_id=task_id_uuid,
parent_id=uuid.UUID(req.parent_id) if req.parent_id else None,
type=req.type,
actor_id=member.id,
payload=payload,
)
db.add(event)
await db.flush() # get event.id
# Create attachment records
for att_in in req.attachments:
att = EventAttachment(
event_id=event.id,
filename=att_in.filename,
mime_type=att_in.mime_type,
size=att_in.size,
storage_path=att_in.storage_name,
)
db.add(att)
await db.commit()
# Reload with relationships
result2 = await db.execute(
select(Event).where(Event.id == event.id).options(
selectinload(Event.attachments),
selectinload(Event.actor),
selectinload(Event.task),
selectinload(Event.project)
)
)
event = result2.scalar_one()
# Build response using shared converter
event_data = event_out(event).model_dump()
# Broadcast via WebSocket
from ..ws.manager import manager
project_id_str = str(project_id_uuid)
# Choose event type for WS broadcast
if req.type in ("chat_message", "task_comment"):
# For backward compatibility: send as message.new
await manager.broadcast_message(project_id_str, event_data, author_id=str(member.id))
else:
# Task events: task.created, task.status, etc.
ws_event_type = f"task.{req.type.replace('task_', '')}"
await manager.broadcast_task_event(project_id_str, ws_event_type, event_data)
return event_out(event)
@router.get("/events/{event_id}/replies", response_model=list[EventOut])
async def list_replies(event_id: str, db: AsyncSession = Depends(get_db)):
"""Get thread replies for an event."""
result = await db.execute(
select(Event)
.where(Event.parent_id == uuid.UUID(event_id))
.options(
selectinload(Event.attachments),
selectinload(Event.actor),
selectinload(Event.task),
selectinload(Event.project)
)
.order_by(Event.created_at)
)
return [event_out(e) for e in result.scalars()]
# --- Backward compatibility alias ---
@router.get("/messages", response_model=list[EventOut])
async def list_messages_compat(
chat_id: Optional[str] = Query(None),
task_id: Optional[str] = Query(None),
parent_id: Optional[str] = Query(None),
limit: int = Query(50, le=200),
offset: int = Query(0),
db: AsyncSession = Depends(get_db),
):
"""Backward compatibility: messages endpoint mapped to events."""
project_id = None
# Resolve project_id from chat_id (assuming project chat exists) or task_id
if chat_id:
# For now, we'll skip chat_id resolution since Chat model will be removed
# Old clients shouldn't be using chat_id after migration
pass
# Map to events endpoint
types = "chat_message,task_comment" if not task_id else None
return await list_events(
project_id=project_id,
task_id=task_id,
types=types,
parent_id=parent_id,
limit=limit,
offset=offset,
db=db
)
@router.post("/messages", response_model=EventOut)
async def create_message_compat(request: Request, db: AsyncSession = Depends(get_db)):
"""Backward compatibility: convert old message format to event."""
# This is a simplified compatibility layer
# In practice, clients should migrate to /events endpoint
raise HTTPException(501, "Use /events endpoint instead")

View File

@ -12,7 +12,7 @@ from sqlalchemy.orm import selectinload
from ..database import get_db from ..database import get_db
from ..enums import AuthorType, ChatKind from ..enums import AuthorType, ChatKind
from ..models import Message, Chat, Attachment from ..models.chat import Message, Chat, Attachment # Legacy imports
from .schemas import MessageOut from .schemas import MessageOut
from .converters import message_out from .converters import message_out

View File

@ -10,7 +10,8 @@ from sqlalchemy.orm import selectinload
from ..database import get_db from ..database import get_db
from ..enums import ChatKind, MemberRole, ProjectStatus from ..enums import ChatKind, MemberRole, ProjectStatus
from ..models import Project, Chat, Member, ProjectMember from ..models import Project, Member, ProjectMember
from ..models.chat import Chat # Legacy import
from ..ws.manager import manager from ..ws.manager import manager
from .schemas import ProjectOut from .schemas import ProjectOut

View File

@ -80,6 +80,34 @@ class MessageOut(BaseModel):
created_at: str created_at: str
class EventOut(BaseModel):
"""Unified event — chat messages, task comments, status changes, etc."""
id: str
project_id: str
task_id: str | None = None
parent_id: str | None = None
type: str # chat_message | task_comment | task_created | task_status | ...
actor: MemberBrief | None = None
payload: dict # Event-specific data
attachments: list[AttachmentOut] = []
created_at: str
class EventCreate(BaseModel):
"""Create event request."""
type: str
project_id: str | None = None
task_id: str | None = None
parent_id: str | None = None
content: str | None = None
mentions: list[str] = []
thinking: str | None = None
voice_url: str | None = None
tool_log: dict | None = None
payload: dict = {}
attachments: list = []
class AgentConfigOut(BaseModel): class AgentConfigOut(BaseModel):
capabilities: list[str] = [] capabilities: list[str] = []
labels: list[str] = [] labels: list[str] = []

View File

@ -12,9 +12,10 @@ from sqlalchemy.orm import selectinload, joinedload
from ..database import get_db from ..database import get_db
from ..enums import ( from ..enums import (
AuthorType, ChatKind, MemberType, TaskActionType, TaskLinkType, TaskPriority, TaskStatus, TaskType, WSEventType, AuthorType, ChatKind, MemberType, TaskLinkType, TaskPriority, TaskStatus, TaskType, WSEventType,
) )
from ..models import Task, Step, Project, Member, Message, Chat, TaskAction, TaskLink from ..models import Task, Step, Project, Member, TaskLink, Event
from ..models.chat import Message, Chat # Legacy imports
from .auth import get_current_member from .auth import get_current_member
from .schemas import TaskOut, MessageOut, MemberBrief from .schemas import TaskOut, MessageOut, MemberBrief
from .converters import task_out, message_out, member_brief from .converters import task_out, message_out, member_brief
@ -59,12 +60,7 @@ class AssignRequest(BaseModel):
# --- Helpers --- # --- Helpers ---
async def _get_project_chat_id(db: AsyncSession, project_id) -> uuid.UUID | None: # Removed _get_project_chat_id - no longer using Chat model
result = await db.execute(
select(Chat).where(Chat.project_id == project_id, Chat.kind == ChatKind.PROJECT)
)
chat = result.scalar_one_or_none()
return chat.id if chat else None
async def _record_action( async def _record_action(
@ -76,15 +72,25 @@ async def _record_action(
old_value: str | None = None, old_value: str | None = None,
new_value: str | None = None, new_value: str | None = None,
): ):
"""Record a task action in the audit log.""" """Record a task action as Event (audit log)."""
db.add(TaskAction( payload = {
"action": action,
}
if field:
payload["field"] = field
if old_value:
payload["old_value"] = old_value
if new_value:
payload["new_value"] = new_value
event = Event(
project_id=task.project_id,
task_id=task.id, task_id=task.id,
type=f"task_{action.lower()}", # task_created, task_assigned, etc.
actor_id=actor.id, actor_id=actor.id,
action=action, payload=payload,
field=field, )
old_value=old_value, db.add(event)
new_value=new_value,
))
async def _system_message( async def _system_message(
@ -96,93 +102,66 @@ async def _system_message(
actor: Member | None = None, actor: Member | None = None,
mentioned_members: list[Member] | None = None, mentioned_members: list[Member] | None = None,
): ):
"""Create system messages: one in project chat + one in task comments. """Create system message as Event (replaces old dual Message approach).
actor: who initiated the action (stored as actor_id) actor: who initiated the action (stored as actor_id)
mentioned_members: members referenced in the message (stored in mentions array) mentioned_members: members referenced in the message (stored in mentions array)
""" """
from ..ws.manager import manager from ..ws.manager import manager
from ..models import Event
from .schemas import EventOut
from .converters import event_out, member_brief
prefix = project_slug[:2].upper() if project_slug else "XX" prefix = project_slug[:2].upper() if project_slug else "XX"
key = f"{prefix}-{task.number}" key = f"{prefix}-{task.number}"
task_text = task_text or chat_text task_text = task_text or chat_text
actor_slug = actor.slug if actor else "system" actor_brief_obj = member_brief(actor) if actor else None
actor_brief = MemberBrief(id=str(actor.id), slug=actor.slug, name=actor.name) if actor else None
# Mentioned members as IDs (DB storage) and briefs (broadcast) # Mentioned members as IDs and briefs
mention_ids = [str(m.id) for m in (mentioned_members or [])] mention_ids = [str(m.id) for m in (mentioned_members or [])]
mention_briefs = [MemberBrief(id=str(m.id), slug=m.slug, name=m.name) for m in (mentioned_members or [])] mention_briefs = [member_brief(m) for m in (mentioned_members or [])]
chat_id = await _get_project_chat_id(db, task.project_id) # Create single Event for task comment
event_payload = {
"content": task_text,
"mentions": mention_ids,
}
# 1. Task comment event = Event(
task_msg = Message( project_id=task.project_id,
task_id=task.id, task_id=task.id,
author_type=AuthorType.SYSTEM, type="task_comment",
author_id=None,
actor_id=actor.id if actor else None, actor_id=actor.id if actor else None,
content=task_text, payload=event_payload,
mentions=mention_ids, # still stored as slugs in DB for compat
) )
db.add(task_msg) db.add(event)
# 2. Chat message
chat_msg = None
if chat_id:
chat_msg = Message(
chat_id=chat_id,
author_type=AuthorType.SYSTEM,
author_id=None,
actor_id=actor.id if actor else None,
content=chat_text,
mentions=mention_ids,
)
db.add(chat_msg)
await db.flush() await db.flush()
now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() # Build EventOut for broadcast (backward compatible with MessageOut format)
event_out_obj = event_out(event)
# Build backward-compatible message format for WS broadcast
# Convert EventOut to MessageOut-like structure for existing clients
task_data = {
"id": event_out_obj.id,
"task_id": str(task.id),
"author_type": "system",
"author_id": None,
"author": None,
"actor": actor_brief_obj.model_dump() if actor_brief_obj else None,
"content": task_text,
"mentions": [mb.model_dump() for mb in mention_briefs],
"attachments": [],
"created_at": event_out_obj.created_at,
"project_id": str(task.project_id),
}
# Build MessageOut for task comment broadcast
task_msg_out = MessageOut(
id=str(task_msg.id),
task_id=str(task.id),
author_type=AuthorType.SYSTEM,
author_id=None,
author=None,
actor=actor_brief,
content=task_text,
mentions=mention_briefs,
attachments=[],
created_at=task_msg.created_at.isoformat() if task_msg.created_at else now_iso,
)
project_id = str(task.project_id)
task_data = task_msg_out.model_dump()
task_data["project_id"] = project_id
await manager.broadcast_message( await manager.broadcast_message(
project_id, str(task.project_id),
task_data, task_data,
author_id=None, author_id=None,
) )
# Broadcast chat message
if chat_msg and chat_id:
chat_msg_out = MessageOut(
id=str(chat_msg.id),
chat_id=str(chat_id),
author_type=AuthorType.SYSTEM,
author_id=None,
author=None,
actor=actor_brief,
content=chat_text,
mentions=mention_briefs,
attachments=[],
created_at=chat_msg.created_at.isoformat() if chat_msg.created_at else now_iso,
)
chat_data = chat_msg_out.model_dump()
chat_data["project_id"] = project_id
await manager.broadcast_message(project_id, chat_data, author_id=None)
async def _get_task(task_id: str, db: AsyncSession) -> Task: async def _get_task(task_id: str, db: AsyncSession) -> Task:
result = await db.execute( result = await db.execute(
@ -314,7 +293,7 @@ async def create_task(
task.watcher_ids = [agent.id] task.watcher_ids = [agent.id]
break break
await _record_action(db, task, current_member, TaskActionType.CREATED) await _record_action(db, task, current_member, "created")
await db.commit() await db.commit()
task_full = await _get_task(str(task.id), db) task_full = await _get_task(str(task.id), db)
@ -356,12 +335,12 @@ async def update_task(
now_str = datetime.datetime.now(datetime.timezone.utc).strftime("%H:%M UTC") now_str = datetime.datetime.now(datetime.timezone.utc).strftime("%H:%M UTC")
if req.title is not None and req.title != task.title: if req.title is not None and req.title != task.title:
await _record_action(db, task, current_member, TaskActionType.TITLE_CHANGED, await _record_action(db, task, current_member, "title_changed",
"title", task.title, req.title) "title", task.title, req.title)
task.title = req.title task.title = req.title
if req.description is not None and req.description != task.description: if req.description is not None and req.description != task.description:
await _record_action(db, task, current_member, TaskActionType.DESCRIPTION_CHANGED, await _record_action(db, task, current_member, "description_changed",
"description", task.description or "", req.description) "description", task.description or "", req.description)
task.description = req.description task.description = req.description
@ -370,7 +349,7 @@ async def update_task(
if req.status is not None and req.status != task.status: if req.status is not None and req.status != task.status:
old_status = task.status old_status = task.status
await _record_action(db, task, current_member, TaskActionType.STATUS_CHANGED, await _record_action(db, task, current_member, "status_changed",
"status", old_status, req.status) "status", old_status, req.status)
task.status = req.status task.status = req.status
await _system_message( await _system_message(
@ -382,7 +361,7 @@ async def update_task(
if req.priority is not None and req.priority != task.priority: if req.priority is not None and req.priority != task.priority:
old_priority = task.priority old_priority = task.priority
await _record_action(db, task, current_member, TaskActionType.PRIORITY_CHANGED, await _record_action(db, task, current_member, "priority_changed",
"priority", old_priority, req.priority) "priority", old_priority, req.priority)
task.priority = req.priority task.priority = req.priority
@ -396,7 +375,7 @@ async def update_task(
task.assignee_id = new_assignee_id task.assignee_id = new_assignee_id
if new_assignee_id: if new_assignee_id:
new_assignee = await _resolve_member(db, req.assignee_id) new_assignee = await _resolve_member(db, req.assignee_id)
await _record_action(db, task, current_member, TaskActionType.ASSIGNED, await _record_action(db, task, current_member, "assigned",
"assignee_id", str(old_assignee_id) if old_assignee_id else None, "assignee_id", str(old_assignee_id) if old_assignee_id else None,
str(new_assignee_id)) str(new_assignee_id))
await _system_message( await _system_message(
@ -409,7 +388,7 @@ async def update_task(
if new_assignee_id not in (task.watcher_ids or []): if new_assignee_id not in (task.watcher_ids or []):
task.watcher_ids = (task.watcher_ids or []) + [new_assignee_id] task.watcher_ids = (task.watcher_ids or []) + [new_assignee_id]
else: else:
await _record_action(db, task, current_member, TaskActionType.UNASSIGNED, await _record_action(db, task, current_member, "unassigned",
"assignee_id", str(old_assignee_id), None) "assignee_id", str(old_assignee_id), None)
await _system_message( await _system_message(
db, task, db, task,
@ -423,7 +402,7 @@ async def update_task(
new_reviewer_id = uuid.UUID(req.reviewer_id) if req.reviewer_id else None new_reviewer_id = uuid.UUID(req.reviewer_id) if req.reviewer_id else None
if new_reviewer_id != old_reviewer_id: if new_reviewer_id != old_reviewer_id:
task.reviewer_id = new_reviewer_id task.reviewer_id = new_reviewer_id
await _record_action(db, task, current_member, TaskActionType.REVIEWER_CHANGED, await _record_action(db, task, current_member, "reviewer_changed",
"reviewer_id", str(old_reviewer_id) if old_reviewer_id else None, "reviewer_id", str(old_reviewer_id) if old_reviewer_id else None,
str(new_reviewer_id) if new_reviewer_id else None) str(new_reviewer_id) if new_reviewer_id else None)
@ -479,9 +458,9 @@ async def take_task(
if current_member.id not in (task.watcher_ids or []): if current_member.id not in (task.watcher_ids or []):
task.watcher_ids = (task.watcher_ids or []) + [current_member.id] task.watcher_ids = (task.watcher_ids or []) + [current_member.id]
await _record_action(db, task, current_member, TaskActionType.ASSIGNED, await _record_action(db, task, current_member, "assigned",
"assignee_id", None, str(current_member.id)) "assignee_id", None, str(current_member.id))
await _record_action(db, task, current_member, TaskActionType.STATUS_CHANGED, await _record_action(db, task, current_member, "status_changed",
"status", TaskStatus.BACKLOG, TaskStatus.IN_PROGRESS) "status", TaskStatus.BACKLOG, TaskStatus.IN_PROGRESS)
proj_slug = task.project.slug if task.project else "" proj_slug = task.project.slug if task.project else ""
@ -517,9 +496,9 @@ async def reject_task(
task.status = TaskStatus.BACKLOG task.status = TaskStatus.BACKLOG
old_status = task.status old_status = task.status
await _record_action(db, task, current_member, TaskActionType.UNASSIGNED, await _record_action(db, task, current_member, "unassigned",
"assignee_id", str(old_assignee_id) if old_assignee_id else None, None) "assignee_id", str(old_assignee_id) if old_assignee_id else None, None)
await _record_action(db, task, current_member, TaskActionType.STATUS_CHANGED, await _record_action(db, task, current_member, "status_changed",
"status", old_status, TaskStatus.BACKLOG) "status", old_status, TaskStatus.BACKLOG)
proj_slug = task.project.slug if task.project else "" proj_slug = task.project.slug if task.project else ""
@ -557,7 +536,7 @@ async def assign_task(
if assignee.id not in (task.watcher_ids or []): if assignee.id not in (task.watcher_ids or []):
task.watcher_ids = (task.watcher_ids or []) + [assignee.id] task.watcher_ids = (task.watcher_ids or []) + [assignee.id]
await _record_action(db, task, current_member, TaskActionType.ASSIGNED, await _record_action(db, task, current_member, "assigned",
"assignee_id", str(old_assignee_id) if old_assignee_id else None, "assignee_id", str(old_assignee_id) if old_assignee_id else None,
str(assignee.id)) str(assignee.id))
@ -589,7 +568,7 @@ async def watch_task(
task = await _get_task(task_id, db) task = await _get_task(task_id, db)
if current_member.id not in (task.watcher_ids or []): if current_member.id not in (task.watcher_ids or []):
task.watcher_ids = (task.watcher_ids or []) + [current_member.id] task.watcher_ids = (task.watcher_ids or []) + [current_member.id]
await _record_action(db, task, current_member, TaskActionType.WATCHER_ADDED, await _record_action(db, task, current_member, "watcher_added",
"watcher_ids", None, str(current_member.id)) "watcher_ids", None, str(current_member.id))
await db.commit() await db.commit()
return {"ok": True, "watcher_ids": [str(w) for w in (task.watcher_ids or [])]} return {"ok": True, "watcher_ids": [str(w) for w in (task.watcher_ids or [])]}
@ -603,7 +582,7 @@ async def unwatch_task(
): ):
task = await _get_task(task_id, db) task = await _get_task(task_id, db)
task.watcher_ids = [w for w in (task.watcher_ids or []) if w != current_member.id] task.watcher_ids = [w for w in (task.watcher_ids or []) if w != current_member.id]
await _record_action(db, task, current_member, TaskActionType.WATCHER_REMOVED, await _record_action(db, task, current_member, "watcher_removed",
"watcher_ids", str(current_member.id), None) "watcher_ids", str(current_member.id), None)
await db.commit() await db.commit()
return {"ok": True, "watcher_ids": [str(w) for w in (task.watcher_ids or [])]} return {"ok": True, "watcher_ids": [str(w) for w in (task.watcher_ids or [])]}

View File

@ -169,14 +169,15 @@ app.add_middleware(
) )
# Routers # Routers
from .api import auth, members, projects, tasks, messages, steps, attachments, project_files, labels # noqa: E402 from .api import auth, members, projects, tasks, messages, events, steps, attachments, project_files, labels # noqa: E402
from .ws.handler import router as ws_router # noqa: E402 from .ws.handler import router as ws_router # noqa: E402
app.include_router(auth.router, prefix="/api/v1") app.include_router(auth.router, prefix="/api/v1")
app.include_router(members.router, prefix="/api/v1") app.include_router(members.router, prefix="/api/v1")
app.include_router(projects.router, prefix="/api/v1") app.include_router(projects.router, prefix="/api/v1")
app.include_router(tasks.router, prefix="/api/v1") app.include_router(tasks.router, prefix="/api/v1")
app.include_router(messages.router, prefix="/api/v1") app.include_router(messages.router, prefix="/api/v1") # Backward compatibility
app.include_router(events.router, prefix="/api/v1") # New events API
app.include_router(steps.router, prefix="/api/v1") app.include_router(steps.router, prefix="/api/v1")
app.include_router(attachments.router, prefix="/api/v1") app.include_router(attachments.router, prefix="/api/v1")
app.include_router(project_files.router, prefix="/api/v1") app.include_router(project_files.router, prefix="/api/v1")

View File

@ -6,8 +6,8 @@ import logging
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from .enums import AuthMethod, ChatKind, ListenMode, MemberRole, MemberStatus, MemberType, ProjectStatus from .enums import AuthMethod, ListenMode, MemberRole, MemberStatus, MemberType, ProjectStatus
from .models import Base, Member, Chat, Project, ProjectMember, AgentConfig from .models import Base, Member, Project, ProjectMember, AgentConfig
logger = logging.getLogger("tracker.init_db") logger = logging.getLogger("tracker.init_db")
@ -83,9 +83,7 @@ async def seed_dev_data(session: AsyncSession):
session.add(project) session.add(project)
await session.flush() await session.flush()
# Project chat # Project chat - REMOVED: using Events instead of Chat+Message
project_chat = Chat(kind=ChatKind.PROJECT, project_id=project.id)
session.add(project_chat)
# Project members # Project members
session.add(ProjectMember(project_id=project.id, member_id=admin.id, role=MemberRole.OWNER)) session.add(ProjectMember(project_id=project.id, member_id=admin.id, role=MemberRole.OWNER))
@ -107,12 +105,10 @@ async def seed_dev_data(session: AsyncSession):
session.add(ProjectMember(project_id=project.id, member_id=bridge.id, role=MemberRole.MEMBER)) session.add(ProjectMember(project_id=project.id, member_id=bridge.id, role=MemberRole.MEMBER))
# Lobby chat # Lobby chat - REMOVED: using Events instead
lobby = Chat(kind=ChatKind.LOBBY, project_id=None)
session.add(lobby)
await session.commit() await session.commit()
logger.info("Dev seed: admin, coder, project team-board (id=%s), lobby", project.id) logger.info("Dev seed: admin, coder, project team-board (id=%s)", project.id)
async def reset_db(): async def reset_db():

View File

@ -3,8 +3,8 @@
from .base import Base from .base import Base
from .member import AgentConfig, Member from .member import AgentConfig, Member
from .project import Project, ProjectMember from .project import Project, ProjectMember
from .task import Label, Step, Task, TaskAction, TaskLabel, TaskLink from .task import Label, Step, Task, TaskLabel, TaskLink
from .chat import Attachment, Chat, Message from .event import Event, EventAttachment
from .project_file import ProjectFile from .project_file import ProjectFile
__all__ = [ __all__ = [
@ -14,11 +14,10 @@ __all__ = [
"Project", "Project",
"ProjectMember", "ProjectMember",
"Task", "Task",
"TaskAction", "Label",
"TaskLink", "TaskLabel",
"Step", "Step",
"Chat", "Event",
"Message", "EventAttachment",
"Attachment",
"ProjectFile", "ProjectFile",
] ]

View File

@ -0,0 +1,72 @@
"""Event model — unified event store replacing messages + task_actions."""
import uuid
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, Integer, String, Text
from sqlalchemy.dialects.postgresql import ARRAY, JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Base
if TYPE_CHECKING:
from .member import Member
from .project import Project
from .task import Task
class Event(Base):
"""Unified event — chat messages, task comments, status changes, etc."""
__tablename__ = "events"
project_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False, index=True
)
type: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
# Types: chat_message, task_comment, task_created, task_status,
# task_assigned, task_unassigned, task_updated, task_label_add, task_label_remove
actor_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("members.id"), nullable=True)
task_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("tasks.id", ondelete="CASCADE"), nullable=True, index=True
)
parent_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("events.id"), nullable=True
)
# Payload — all event-specific data
payload: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
# chat_message: {content, mentions?, thinking?, voice_url?, tool_log?}
# task_comment: {content, mentions?, thinking?, tool_log?}
# task_created: {title, status, priority, assignee?}
# task_status: {from, to}
# task_assigned: {assignee, previous?}
# task_unassigned: {previous}
# task_updated: {field, from, to}
# Relationships
project: Mapped["Project"] = relationship()
actor: Mapped["Member | None"] = relationship(foreign_keys=[actor_id])
task: Mapped["Task | None"] = relationship()
attachments: Mapped[list["EventAttachment"]] = relationship(
back_populates="event", cascade="all, delete-orphan"
)
replies: Mapped[list["Event"]] = relationship(back_populates="parent")
parent: Mapped["Event | None"] = relationship(
back_populates="replies", remote_side="Event.id"
)
class EventAttachment(Base):
"""File attachment on an event."""
__tablename__ = "event_attachments"
event_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("events.id"), nullable=False
)
filename: Mapped[str] = mapped_column(String(500), nullable=False)
mime_type: Mapped[str | None] = mapped_column(String(100))
size: Mapped[int] = mapped_column(Integer, default=0)
storage_path: Mapped[str] = mapped_column(String(1000), nullable=False)
event: Mapped["Event"] = relationship(back_populates="attachments")

View File

@ -81,15 +81,4 @@ class TaskLink(Base):
target: Mapped["Task"] = relationship(foreign_keys=[target_id]) target: Mapped["Task"] = relationship(foreign_keys=[target_id])
class TaskAction(Base): # TaskAction removed — replaced by Event model
__tablename__ = "task_actions"
task_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("tasks.id", ondelete="CASCADE"))
actor_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("members.id"))
action: Mapped[str] = mapped_column(String(50), nullable=False) # created, status_changed, assigned, etc.
field: Mapped[str | None] = mapped_column(String(100)) # какое поле изменилось
old_value: Mapped[str | None] = mapped_column(Text)
new_value: Mapped[str | None] = mapped_column(Text)
task: Mapped["Task"] = relationship()
actor: Mapped["Member"] = relationship()

View File

@ -12,7 +12,8 @@ from ..enums import (
AuthMethod, ChatKind, ListenMode, MemberRole, MemberStatus, MemberType, AuthMethod, ChatKind, ListenMode, MemberRole, MemberStatus, MemberType,
ProjectStatus, WSEventType, ProjectStatus, WSEventType,
) )
from ..models import Member, AgentConfig, Chat, Message, Project, ProjectMember from ..models import Member, AgentConfig, Project, ProjectMember, Task, Event
from ..models.chat import Chat, Message # Legacy imports
from ..api.schemas import MessageOut, MemberBrief from ..api.schemas import MessageOut, MemberBrief
from .manager import ConnectedClient, manager from .manager import ConnectedClient, manager
@ -195,10 +196,7 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No
effective_member.status = MemberStatus.ONLINE effective_member.status = MemberStatus.ONLINE
await db.commit() await db.commit()
# Get lobby chat + projects # Get projects (no more lobby/chat dependency)
lobby = await db.execute(select(Chat).where(Chat.kind == ChatKind.LOBBY))
lobby_chat = lobby.scalar_one_or_none()
if effective_member.role == MemberRole.OWNER: if effective_member.role == MemberRole.OWNER:
projects = await db.execute(select(Project).where(Project.status == ProjectStatus.ACTIVE)) projects = await db.execute(select(Project).where(Project.status == ProjectStatus.ACTIVE))
else: else:
@ -210,15 +208,11 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No
project_list = [] project_list = []
for p in projects.scalars(): for p in projects.scalars():
chat_result = await db.execute(
select(Chat).where(Chat.project_id == p.id, Chat.kind == "project")
)
chat = chat_result.scalar_one_or_none()
project_list.append({ project_list.append({
"id": str(p.id), "id": str(p.id),
"slug": p.slug, "slug": p.slug,
"name": p.name, "name": p.name,
"chat_id": str(chat.id) if chat else None, "chat_id": None, # Legacy field - no longer used
}) })
# Auto-subscribe # Auto-subscribe
@ -237,7 +231,7 @@ async def _authenticate(ws: WebSocket, token: str, on_behalf_of: str | None = No
"member_id": str(effective_member.id), "member_id": str(effective_member.id),
"slug": effective_member.slug, "slug": effective_member.slug,
"name": effective_member.name, "name": effective_member.name,
"lobby_chat_id": str(lobby_chat.id) if lobby_chat else None, "lobby_chat_id": None, # Legacy field - no longer used
"projects": project_list, "projects": project_list,
"online": online_list, "online": online_list,
} }
@ -315,7 +309,8 @@ async def _handle_chat_send(session_id: str, data: dict):
if not client: if not client:
return return
chat_id = data.get("chat_id") chat_id = data.get("chat_id") # legacy - will map to project_id
project_id = data.get("project_id") # preferred
task_id = data.get("task_id") task_id = data.get("task_id")
content = data.get("content", "") content = data.get("content", "")
thinking = data.get("thinking") thinking = data.get("thinking")
@ -332,56 +327,83 @@ async def _handle_chat_send(session_id: str, data: dict):
await client.ws.send_json({"type": WSEventType.ERROR, "message": "Member not found or inactive"}) await client.ws.send_json({"type": WSEventType.ERROR, "message": "Member not found or inactive"})
return return
msg = Message( # Resolve project_id
chat_id=uuid.UUID(chat_id) if chat_id else None, resolved_project_id = None
task_id=uuid.UUID(task_id) if task_id else None, event_type = "chat_message"
author_type=member.type,
author_id=member.id,
content=content,
thinking=thinking,
tool_log=tool_log,
mentions=mentions,
)
db.add(msg)
await db.commit()
await db.refresh(msg)
msg_data = _to_message_out(msg, member).model_dump() if project_id:
resolved_project_id = uuid.UUID(project_id)
# Resolve project_id and inject into message data elif chat_id:
project_id = None # Legacy: resolve project_id from chat_id
if chat_id:
chat_result = await db.execute(select(Chat).where(Chat.id == uuid.UUID(chat_id))) chat_result = await db.execute(select(Chat).where(Chat.id == uuid.UUID(chat_id)))
chat = chat_result.scalar_one_or_none() chat = chat_result.scalar_one_or_none()
if chat and chat.project_id: if chat and chat.project_id:
project_id = str(chat.project_id) resolved_project_id = chat.project_id
elif chat and chat.kind == ChatKind.LOBBY: elif chat and chat.kind == ChatKind.LOBBY:
await manager.broadcast_all( # Skip lobby for now - Events need project_id
{"type": WSEventType.MESSAGE_NEW, "data": msg_data}, await client.ws.send_json({"type": WSEventType.ERROR, "message": "Lobby chat not supported yet"})
exclude_member_id=client.member_id,
)
return return
elif task_id: elif task_id:
from ..models import Task as TaskModel # Task comment
task_result = await db.execute(select(TaskModel).where(TaskModel.id == uuid.UUID(task_id))) event_type = "task_comment"
task_result = await db.execute(select(Task).where(Task.id == uuid.UUID(task_id)))
task_obj = task_result.scalar_one_or_none() task_obj = task_result.scalar_one_or_none()
if task_obj: if task_obj:
project_id = str(task_obj.project_id) resolved_project_id = task_obj.project_id
if project_id: if not resolved_project_id:
msg_data["project_id"] = project_id await client.ws.send_json({"type": WSEventType.ERROR, "message": "project_id required"})
return
if project_id: # Create Event
await manager.broadcast_message( from ..models import Event
project_id, msg_data,
author_id=client.member_id, event_payload = {
author_session_id=session_id, "content": content,
) "mentions": mentions,
else: }
await manager.broadcast_all( if thinking:
{"type": WSEventType.MESSAGE_NEW, "data": msg_data}, event_payload["thinking"] = thinking
exclude_member_id=client.member_id, if tool_log:
) event_payload["tool_log"] = tool_log
event = Event(
project_id=resolved_project_id,
task_id=uuid.UUID(task_id) if task_id else None,
type=event_type,
actor_id=member.id,
payload=event_payload,
)
db.add(event)
await db.commit()
await db.refresh(event)
# Build backward-compatible message format for WS broadcast
from ..api.converters import member_brief
actor_brief_obj = member_brief(member)
event_data = {
"id": str(event.id),
"task_id": str(event.task_id) if event.task_id else None,
"chat_id": chat_id, # Keep for backward compatibility
"author_type": member.type,
"author_id": str(member.id),
"author": actor_brief_obj.model_dump() if actor_brief_obj else None,
"actor": actor_brief_obj.model_dump() if actor_brief_obj else None,
"content": content,
"thinking": thinking,
"tool_log": tool_log,
"mentions": [{"id": mid, "slug": "", "name": ""} for mid in mentions], # simplified mentions
"attachments": [],
"created_at": event.created_at.isoformat() if event.created_at else "",
"project_id": str(resolved_project_id),
}
await manager.broadcast_message(
str(resolved_project_id), event_data,
author_id=client.member_id,
author_session_id=session_id,
)
async def _handle_agent_stream(session_id: str, event_type: str, data: dict): async def _handle_agent_stream(session_id: str, event_type: str, data: dict):