"""WebSocket client for Tracker.""" import asyncio import json import logging from typing import Callable, Awaitable import httpx import websockets from config import Config logger = logging.getLogger(__name__) class TrackerClient: """Connects to Tracker WS as a bridge member.""" def __init__(self, config: Config, on_message: Callable[[dict], Awaitable[None]]): self.config = config self.on_message = on_message self._ws = None self._running = False self._project_chat_ids: dict[str, str] = {} # project_uuid -> chat_id self._send_queue: asyncio.Queue = asyncio.Queue() async def connect(self): """Connect and authenticate.""" url = f"{self.config.tracker_ws_url}?token={self.config.bridge_token}" self._running = True while self._running: try: async with websockets.connect(url) as ws: self._ws = ws logger.info("Connected to Tracker WS") # Run reader, writer and heartbeat concurrently await asyncio.gather( self._reader(ws), self._writer(ws), self._heartbeat(ws), ) except websockets.ConnectionClosed: logger.warning("Tracker WS closed, reconnecting in 5s...") await asyncio.sleep(5) except Exception as e: logger.error("Tracker WS error: %s, reconnecting in 5s...", e) await asyncio.sleep(5) async def _heartbeat(self, ws): """Send periodic heartbeat to keep connection alive.""" while True: await asyncio.sleep(25) try: await ws.send(json.dumps({"type": "heartbeat", "status": "online"})) logger.debug("Heartbeat sent") except Exception: break async def _reader(self, ws): """Read events from Tracker WS.""" async for raw in ws: try: event = json.loads(raw) await self._handle_event(event) except json.JSONDecodeError: logger.warning("Invalid JSON from Tracker: %s", raw[:200]) async def _writer(self, ws): """Send queued messages to Tracker WS.""" while True: payload = await self._send_queue.get() try: await ws.send(json.dumps(payload)) logger.info("WS sent: %s", payload.get("type")) except Exception as e: logger.error("WS send error: %s", e) break async def _handle_event(self, event: dict): """Route Tracker events.""" event_type = event.get("type", "") if event_type == "auth.ok": logger.info("Authenticated as bridge") await self._cache_projects() return if event_type == "error": logger.error("Tracker error: %s", event.get("message")) return if event_type == "pong": return logger.info("WS event: %s", event_type) # Forward events to Telegram if event_type in ("message.new", "task.updated", "task.created", "project.created"): await self.on_message(event) async def _cache_projects(self): """Fetch all projects and cache their chat_ids.""" try: async with httpx.AsyncClient() as client: resp = await client.get( f"{self.config.tracker_url}/api/v1/projects", headers={"Authorization": f"Bearer {self.config.bridge_token}"}, ) if resp.status_code == 200: for p in resp.json(): pid = p.get("id") cid = p.get("chat_id") if pid and cid: self._project_chat_ids[pid] = cid logger.info("Cached %d project chat_ids", len(self._project_chat_ids)) except Exception as e: logger.error("Failed to cache projects: %s", e) async def get_chat_id(self, project_uuid: str) -> str | None: """Get chat_id for a project, fetching if not cached.""" if project_uuid in self._project_chat_ids: return self._project_chat_ids[project_uuid] # Try to fetch try: async with httpx.AsyncClient() as client: resp = await client.get( f"{self.config.tracker_url}/api/v1/projects/{project_uuid}", headers={"Authorization": f"Bearer {self.config.bridge_token}"}, ) if resp.status_code == 200: cid = resp.json().get("chat_id") if cid: self._project_chat_ids[project_uuid] = cid return cid except Exception as e: logger.error("Failed to fetch project %s: %s", project_uuid[:8], e) return None async def send_message(self, project_uuid: str, text: str): """Send a chat message to Tracker.""" if not self._ws: logger.warning("Not connected to Tracker") return chat_id = await self.get_chat_id(project_uuid) if not chat_id: logger.warning("No chat_id for project %s", project_uuid[:8]) return payload = { "type": "chat.send", "chat_id": chat_id, "content": text, } await self._send_queue.put(payload) logger.info("Queued to Tracker: project=%s chat=%s", project_uuid[:8], chat_id[:8]) def stop(self): self._running = False