"""WebSocket client for Tracker.""" import asyncio import json import logging from typing import Callable, Awaitable import websockets from config import Config logger = logging.getLogger(__name__) class TrackerClient: """Connects to Tracker WS as a bridge member.""" def __init__(self, config: Config, on_message: Callable[[dict], Awaitable[None]]): self.config = config self.on_message = on_message self._ws = None self._running = False self._send_queue: asyncio.Queue = asyncio.Queue() async def connect(self): """Connect and authenticate.""" url = f"{self.config.tracker_ws_url}?token={self.config.bridge_token}" self._running = True while self._running: try: async with websockets.connect(url) as ws: self._ws = ws logger.info("Connected to Tracker WS") await asyncio.gather( self._reader(ws), self._writer(ws), self._heartbeat(ws), ) except websockets.ConnectionClosed: logger.warning("Tracker WS closed, reconnecting in 5s...") await asyncio.sleep(5) except Exception as e: logger.error("Tracker WS error: %s, reconnecting in 5s...", e) await asyncio.sleep(5) async def _heartbeat(self, ws): """Send periodic heartbeat to keep connection alive.""" while True: await asyncio.sleep(25) try: await ws.send(json.dumps({"type": "heartbeat", "status": "online"})) logger.debug("Heartbeat sent") except Exception: break async def _reader(self, ws): """Read events from Tracker WS.""" async for raw in ws: try: event = json.loads(raw) await self._handle_event(event) except json.JSONDecodeError: logger.warning("Invalid JSON from Tracker: %s", raw[:200]) async def _writer(self, ws): """Send queued messages to Tracker WS.""" while True: payload = await self._send_queue.get() try: await ws.send(json.dumps(payload)) logger.info("WS sent: %s", payload.get("type")) except Exception as e: logger.error("WS send error: %s", e) break async def _handle_event(self, event: dict): """Route Tracker events.""" event_type = event.get("type", "") if event_type == "auth.ok": logger.info("Authenticated as bridge") return if event_type == "error": logger.error("Tracker error: %s", event.get("message")) return if event_type == "pong": return logger.info("WS event: %s", event_type) # Forward events to Telegram if event_type in ("message.new", "task.updated", "task.created", "project.created"): await self.on_message(event) async def send_message(self, project_id: str, text: str): """Send a chat message to Tracker via WS chat.send.""" if not self._ws: logger.warning("Not connected to Tracker") return payload = { "type": "chat.send", "project_id": project_id, "content": text, } await self._send_queue.put(payload) logger.info("Queued to Tracker: project=%s", project_id[:8]) def stop(self): self._running = False