115 lines
3.6 KiB
Python
115 lines
3.6 KiB
Python
"""WebSocket client for Tracker."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Callable, Awaitable
|
|
|
|
import websockets
|
|
|
|
from config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TrackerClient:
|
|
"""Connects to Tracker WS as a bridge member."""
|
|
|
|
def __init__(self, config: Config, on_message: Callable[[dict], Awaitable[None]]):
|
|
self.config = config
|
|
self.on_message = on_message
|
|
self._ws = None
|
|
self._running = False
|
|
self._send_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def connect(self):
|
|
"""Connect and authenticate."""
|
|
url = f"{self.config.tracker_ws_url}?token={self.config.bridge_token}"
|
|
self._running = True
|
|
|
|
while self._running:
|
|
try:
|
|
async with websockets.connect(url) as ws:
|
|
self._ws = ws
|
|
logger.info("Connected to Tracker WS")
|
|
|
|
await asyncio.gather(
|
|
self._reader(ws),
|
|
self._writer(ws),
|
|
self._heartbeat(ws),
|
|
)
|
|
except websockets.ConnectionClosed:
|
|
logger.warning("Tracker WS closed, reconnecting in 5s...")
|
|
await asyncio.sleep(5)
|
|
except Exception as e:
|
|
logger.error("Tracker WS error: %s, reconnecting in 5s...", e)
|
|
await asyncio.sleep(5)
|
|
|
|
async def _heartbeat(self, ws):
|
|
"""Send periodic heartbeat to keep connection alive."""
|
|
while True:
|
|
await asyncio.sleep(25)
|
|
try:
|
|
await ws.send(json.dumps({"type": "heartbeat", "status": "online"}))
|
|
logger.debug("Heartbeat sent")
|
|
except Exception:
|
|
break
|
|
|
|
async def _reader(self, ws):
|
|
"""Read events from Tracker WS."""
|
|
async for raw in ws:
|
|
try:
|
|
event = json.loads(raw)
|
|
await self._handle_event(event)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Invalid JSON from Tracker: %s", raw[:200])
|
|
|
|
async def _writer(self, ws):
|
|
"""Send queued messages to Tracker WS."""
|
|
while True:
|
|
payload = await self._send_queue.get()
|
|
try:
|
|
await ws.send(json.dumps(payload))
|
|
logger.info("WS sent: %s", payload.get("type"))
|
|
except Exception as e:
|
|
logger.error("WS send error: %s", e)
|
|
break
|
|
|
|
async def _handle_event(self, event: dict):
|
|
"""Route Tracker events."""
|
|
event_type = event.get("type", "")
|
|
|
|
if event_type == "auth.ok":
|
|
logger.info("Authenticated as bridge")
|
|
return
|
|
|
|
if event_type == "error":
|
|
logger.error("Tracker error: %s", event.get("message"))
|
|
return
|
|
|
|
if event_type == "pong":
|
|
return
|
|
|
|
logger.info("WS event: %s", event_type)
|
|
|
|
# Forward events to Telegram
|
|
if event_type in ("message.new", "task.updated", "task.created", "project.created"):
|
|
await self.on_message(event)
|
|
|
|
async def send_message(self, project_id: str, text: str):
|
|
"""Send a chat message to Tracker via WS chat.send."""
|
|
if not self._ws:
|
|
logger.warning("Not connected to Tracker")
|
|
return
|
|
|
|
payload = {
|
|
"type": "chat.send",
|
|
"project_id": project_id,
|
|
"content": text,
|
|
}
|
|
await self._send_queue.put(payload)
|
|
logger.info("Queued to Tracker: project=%s", project_id[:8])
|
|
|
|
def stop(self):
|
|
self._running = False
|