diff --git a/bridge.py b/bridge.py index 080aa65..2e17a60 100644 --- a/bridge.py +++ b/bridge.py @@ -34,7 +34,7 @@ async def on_tracker_event(event: dict): event_type = event.get("type", "") if event_type == "message.new": - msg = event.get("message", {}) + msg = event.get("data", {}) author = msg.get("author", {}) # Skip messages from bridge itself (avoid echo) @@ -50,7 +50,7 @@ async def on_tracker_event(event: dict): return author_name = author.get("name", author.get("slug", "???")) - text = msg.get("text", "") + text = msg.get("content", "") if not text: return @@ -59,7 +59,7 @@ async def on_tracker_event(event: dict): await _send_to_topic(topic_id, tg_text) elif event_type == "task.created": - task = event.get("task", {}) + task = event.get("data", {}) project_id = task.get("project_id") topic_id = topic_map.get_topic(project_id) if project_id else None if not topic_id: @@ -70,6 +70,25 @@ async def on_tracker_event(event: dict): tg_text = f"📋 Новая задача {_escape_html(key)}: {_escape_html(title)}" await _send_to_topic(topic_id, tg_text) + elif event_type == "project.created": + project = event.get("project", {}) + project_id = project.get("id") + project_name = project.get("name", project.get("slug", "???")) + + if not project_id: + return + + # Don't create if already mapped + if topic_map.get_topic(project_id): + return + + # Auto-create topic in Telegram + topic_id = await _create_topic(project_name) + if topic_id: + topic_map.set(topic_id, project_id) + await _send_to_topic(topic_id, f"🔗 Топик привязан к проекту {_escape_html(project_name)}") + logger.info("Auto-created topic %d for project %s (%s)", topic_id, project_name, project_id[:8]) + tracker = TrackerClient(config, on_message=on_tracker_event) @@ -169,6 +188,24 @@ async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE): _bot: Bot | None = None +async def _create_topic(name: str) -> int | None: + """Create a new forum topic in the Telegram group.""" + global _bot + if _bot is None: + _bot = Bot(config.bot_token) + + try: + topic = await _bot.create_forum_topic( + chat_id=config.group_id, + name=name, + ) + logger.info("Created Telegram topic: id=%d name=%s", topic.message_thread_id, name) + return topic.message_thread_id + except Exception as e: + logger.error("Failed to create topic '%s': %s", name, e) + return None + + async def _send_to_topic(topic_id: int, text: str): """Send a message to a specific Telegram topic.""" global _bot @@ -181,6 +218,7 @@ async def _send_to_topic(topic_id: int, text: str): message_thread_id=topic_id, text=text, parse_mode="HTML", + disable_notification=True, ) except Exception as e: logger.error("Failed to send to topic %d: %s", topic_id, e) diff --git a/detect_group.py b/detect_group.py new file mode 100644 index 0000000..21776a9 --- /dev/null +++ b/detect_group.py @@ -0,0 +1,25 @@ +"""Temporary script to detect group_id from incoming messages.""" +import asyncio +import os +from dotenv import load_dotenv +from telegram import Update +from telegram.ext import Application, MessageHandler, filters, ContextTypes + +load_dotenv() + +async def on_any(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + msg = update.effective_message + if not msg: + return + print(f"chat_id={msg.chat_id} type={msg.chat.type} thread={msg.message_thread_id} from={msg.from_user.full_name if msg.from_user else '?'} text={msg.text}") + +async def main(): + app = Application.builder().token(os.getenv("TELEGRAM_BOT_TOKEN")).build() + app.add_handler(MessageHandler(filters.ALL, on_any)) + print("Listening for messages... Send something in the group.") + await app.initialize() + await app.start() + await app.updater.start_polling(drop_pending_updates=True) + await asyncio.Event().wait() + +asyncio.run(main()) diff --git a/tracker_client.py b/tracker_client.py index ac388d2..27dd099 100644 --- a/tracker_client.py +++ b/tracker_client.py @@ -5,8 +5,8 @@ import json import logging from typing import Callable, Awaitable +import httpx import websockets -from websockets.asyncio.client import connect from config import Config @@ -20,8 +20,9 @@ class TrackerClient: self.config = config self.on_message = on_message self._ws = None - self._topic_map: dict[str, int] = {} # project_uuid -> topic_id self._running = False + self._project_chat_ids: dict[str, str] = {} # project_uuid -> chat_id + self._send_queue: asyncio.Queue = asyncio.Queue() async def connect(self): """Connect and authenticate.""" @@ -30,16 +31,16 @@ class TrackerClient: while self._running: try: - async with connect(url) as ws: + async with websockets.connect(url) as ws: self._ws = ws logger.info("Connected to Tracker WS") - async for raw in ws: - try: - event = json.loads(raw) - await self._handle_event(event) - except json.JSONDecodeError: - logger.warning("Invalid JSON from Tracker: %s", raw[:200]) + # Run reader, writer and heartbeat concurrently + await asyncio.gather( + self._reader(ws), + self._writer(ws), + self._heartbeat(ws), + ) except websockets.ConnectionClosed: logger.warning("Tracker WS closed, reconnecting in 5s...") await asyncio.sleep(5) @@ -47,34 +48,115 @@ class TrackerClient: logger.error("Tracker WS error: %s, reconnecting in 5s...", e) await asyncio.sleep(5) + async def _heartbeat(self, ws): + """Send periodic heartbeat to keep connection alive.""" + while True: + await asyncio.sleep(25) + try: + await ws.send(json.dumps({"type": "heartbeat", "status": "online"})) + logger.debug("Heartbeat sent") + except Exception: + break + + async def _reader(self, ws): + """Read events from Tracker WS.""" + async for raw in ws: + try: + event = json.loads(raw) + await self._handle_event(event) + except json.JSONDecodeError: + logger.warning("Invalid JSON from Tracker: %s", raw[:200]) + + async def _writer(self, ws): + """Send queued messages to Tracker WS.""" + while True: + payload = await self._send_queue.get() + try: + await ws.send(json.dumps(payload)) + logger.info("WS sent: %s", payload.get("type")) + except Exception as e: + logger.error("WS send error: %s", e) + break + async def _handle_event(self, event: dict): """Route Tracker events.""" event_type = event.get("type", "") if event_type == "auth.ok": logger.info("Authenticated as bridge") + await self._cache_projects() return if event_type == "error": logger.error("Tracker error: %s", event.get("message")) return - # Forward message events to Telegram - if event_type in ("message.new", "task.updated", "task.created"): + if event_type == "pong": + return + + logger.info("WS event: %s", event_type) + + # Forward events to Telegram + if event_type in ("message.new", "task.updated", "task.created", "project.created"): await self.on_message(event) - async def send_message(self, project_uuid: str, text: str, author_name: str = None): + async def _cache_projects(self): + """Fetch all projects and cache their chat_ids.""" + try: + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{self.config.tracker_url}/api/v1/projects", + headers={"Authorization": f"Bearer {self.config.bridge_token}"}, + ) + if resp.status_code == 200: + for p in resp.json(): + pid = p.get("id") + cid = p.get("chat_id") + if pid and cid: + self._project_chat_ids[pid] = cid + logger.info("Cached %d project chat_ids", len(self._project_chat_ids)) + except Exception as e: + logger.error("Failed to cache projects: %s", e) + + async def get_chat_id(self, project_uuid: str) -> str | None: + """Get chat_id for a project, fetching if not cached.""" + if project_uuid in self._project_chat_ids: + return self._project_chat_ids[project_uuid] + + # Try to fetch + try: + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{self.config.tracker_url}/api/v1/projects/{project_uuid}", + headers={"Authorization": f"Bearer {self.config.bridge_token}"}, + ) + if resp.status_code == 200: + cid = resp.json().get("chat_id") + if cid: + self._project_chat_ids[project_uuid] = cid + return cid + except Exception as e: + logger.error("Failed to fetch project %s: %s", project_uuid[:8], e) + return None + + async def send_message(self, project_uuid: str, text: str): """Send a chat message to Tracker.""" if not self._ws: logger.warning("Not connected to Tracker") return + chat_id = await self.get_chat_id(project_uuid) + if not chat_id: + logger.warning("No chat_id for project %s", project_uuid[:8]) + return + payload = { "type": "chat.send", - "project_id": project_uuid, - "text": text, + "chat_id": chat_id, + "content": text, } - await self._ws.send(json.dumps(payload)) + await self._send_queue.put(payload) + logger.info("Queued to Tracker: project=%s chat=%s", project_uuid[:8], chat_id[:8]) def stop(self): self._running = False