bridge/tracker_client.py

163 lines
5.6 KiB
Python

"""WebSocket client for Tracker."""
import asyncio
import json
import logging
from typing import Callable, Awaitable
import httpx
import websockets
from config import Config
logger = logging.getLogger(__name__)
class TrackerClient:
"""Connects to Tracker WS as a bridge member."""
def __init__(self, config: Config, on_message: Callable[[dict], Awaitable[None]]):
self.config = config
self.on_message = on_message
self._ws = None
self._running = False
self._project_chat_ids: dict[str, str] = {} # project_uuid -> chat_id
self._send_queue: asyncio.Queue = asyncio.Queue()
async def connect(self):
"""Connect and authenticate."""
url = f"{self.config.tracker_ws_url}?token={self.config.bridge_token}"
self._running = True
while self._running:
try:
async with websockets.connect(url) as ws:
self._ws = ws
logger.info("Connected to Tracker WS")
# Run reader, writer and heartbeat concurrently
await asyncio.gather(
self._reader(ws),
self._writer(ws),
self._heartbeat(ws),
)
except websockets.ConnectionClosed:
logger.warning("Tracker WS closed, reconnecting in 5s...")
await asyncio.sleep(5)
except Exception as e:
logger.error("Tracker WS error: %s, reconnecting in 5s...", e)
await asyncio.sleep(5)
async def _heartbeat(self, ws):
"""Send periodic heartbeat to keep connection alive."""
while True:
await asyncio.sleep(25)
try:
await ws.send(json.dumps({"type": "heartbeat", "status": "online"}))
logger.debug("Heartbeat sent")
except Exception:
break
async def _reader(self, ws):
"""Read events from Tracker WS."""
async for raw in ws:
try:
event = json.loads(raw)
await self._handle_event(event)
except json.JSONDecodeError:
logger.warning("Invalid JSON from Tracker: %s", raw[:200])
async def _writer(self, ws):
"""Send queued messages to Tracker WS."""
while True:
payload = await self._send_queue.get()
try:
await ws.send(json.dumps(payload))
logger.info("WS sent: %s", payload.get("type"))
except Exception as e:
logger.error("WS send error: %s", e)
break
async def _handle_event(self, event: dict):
"""Route Tracker events."""
event_type = event.get("type", "")
if event_type == "auth.ok":
logger.info("Authenticated as bridge")
await self._cache_projects()
return
if event_type == "error":
logger.error("Tracker error: %s", event.get("message"))
return
if event_type == "pong":
return
logger.info("WS event: %s", event_type)
# Forward events to Telegram
if event_type in ("message.new", "task.updated", "task.created", "project.created"):
await self.on_message(event)
async def _cache_projects(self):
"""Fetch all projects and cache their chat_ids."""
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.config.tracker_url}/api/v1/projects",
headers={"Authorization": f"Bearer {self.config.bridge_token}"},
)
if resp.status_code == 200:
for p in resp.json():
pid = p.get("id")
cid = p.get("chat_id")
if pid and cid:
self._project_chat_ids[pid] = cid
logger.info("Cached %d project chat_ids", len(self._project_chat_ids))
except Exception as e:
logger.error("Failed to cache projects: %s", e)
async def get_chat_id(self, project_uuid: str) -> str | None:
"""Get chat_id for a project, fetching if not cached."""
if project_uuid in self._project_chat_ids:
return self._project_chat_ids[project_uuid]
# Try to fetch
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.config.tracker_url}/api/v1/projects/{project_uuid}",
headers={"Authorization": f"Bearer {self.config.bridge_token}"},
)
if resp.status_code == 200:
cid = resp.json().get("chat_id")
if cid:
self._project_chat_ids[project_uuid] = cid
return cid
except Exception as e:
logger.error("Failed to fetch project %s: %s", project_uuid[:8], e)
return None
async def send_message(self, project_uuid: str, text: str):
"""Send a chat message to Tracker."""
if not self._ws:
logger.warning("Not connected to Tracker")
return
chat_id = await self.get_chat_id(project_uuid)
if not chat_id:
logger.warning("No chat_id for project %s", project_uuid[:8])
return
payload = {
"type": "chat.send",
"chat_id": chat_id,
"content": text,
}
await self._send_queue.put(payload)
logger.info("Queued to Tracker: project=%s chat=%s", project_uuid[:8], chat_id[:8])
def stop(self):
self._running = False