Fix: heartbeat, send queue, disable_notification, debug logging

This commit is contained in:
Markov 2026-02-28 18:31:48 +01:00
parent 4da4ea944f
commit 4b3f7f0c36
3 changed files with 163 additions and 18 deletions

View File

@ -34,7 +34,7 @@ async def on_tracker_event(event: dict):
event_type = event.get("type", "") event_type = event.get("type", "")
if event_type == "message.new": if event_type == "message.new":
msg = event.get("message", {}) msg = event.get("data", {})
author = msg.get("author", {}) author = msg.get("author", {})
# Skip messages from bridge itself (avoid echo) # Skip messages from bridge itself (avoid echo)
@ -50,7 +50,7 @@ async def on_tracker_event(event: dict):
return return
author_name = author.get("name", author.get("slug", "???")) author_name = author.get("name", author.get("slug", "???"))
text = msg.get("text", "") text = msg.get("content", "")
if not text: if not text:
return return
@ -59,7 +59,7 @@ async def on_tracker_event(event: dict):
await _send_to_topic(topic_id, tg_text) await _send_to_topic(topic_id, tg_text)
elif event_type == "task.created": elif event_type == "task.created":
task = event.get("task", {}) task = event.get("data", {})
project_id = task.get("project_id") project_id = task.get("project_id")
topic_id = topic_map.get_topic(project_id) if project_id else None topic_id = topic_map.get_topic(project_id) if project_id else None
if not topic_id: if not topic_id:
@ -70,6 +70,25 @@ async def on_tracker_event(event: dict):
tg_text = f"📋 Новая задача <b>{_escape_html(key)}</b>: {_escape_html(title)}" tg_text = f"📋 Новая задача <b>{_escape_html(key)}</b>: {_escape_html(title)}"
await _send_to_topic(topic_id, tg_text) await _send_to_topic(topic_id, tg_text)
elif event_type == "project.created":
project = event.get("project", {})
project_id = project.get("id")
project_name = project.get("name", project.get("slug", "???"))
if not project_id:
return
# Don't create if already mapped
if topic_map.get_topic(project_id):
return
# Auto-create topic in Telegram
topic_id = await _create_topic(project_name)
if topic_id:
topic_map.set(topic_id, project_id)
await _send_to_topic(topic_id, f"🔗 Топик привязан к проекту <b>{_escape_html(project_name)}</b>")
logger.info("Auto-created topic %d for project %s (%s)", topic_id, project_name, project_id[:8])
tracker = TrackerClient(config, on_message=on_tracker_event) tracker = TrackerClient(config, on_message=on_tracker_event)
@ -169,6 +188,24 @@ async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
_bot: Bot | None = None _bot: Bot | None = None
async def _create_topic(name: str) -> int | None:
"""Create a new forum topic in the Telegram group."""
global _bot
if _bot is None:
_bot = Bot(config.bot_token)
try:
topic = await _bot.create_forum_topic(
chat_id=config.group_id,
name=name,
)
logger.info("Created Telegram topic: id=%d name=%s", topic.message_thread_id, name)
return topic.message_thread_id
except Exception as e:
logger.error("Failed to create topic '%s': %s", name, e)
return None
async def _send_to_topic(topic_id: int, text: str): async def _send_to_topic(topic_id: int, text: str):
"""Send a message to a specific Telegram topic.""" """Send a message to a specific Telegram topic."""
global _bot global _bot
@ -181,6 +218,7 @@ async def _send_to_topic(topic_id: int, text: str):
message_thread_id=topic_id, message_thread_id=topic_id,
text=text, text=text,
parse_mode="HTML", parse_mode="HTML",
disable_notification=True,
) )
except Exception as e: except Exception as e:
logger.error("Failed to send to topic %d: %s", topic_id, e) logger.error("Failed to send to topic %d: %s", topic_id, e)

25
detect_group.py Normal file
View File

@ -0,0 +1,25 @@
"""Temporary script to detect group_id from incoming messages."""
import asyncio
import os
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes
load_dotenv()
async def on_any(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
msg = update.effective_message
if not msg:
return
print(f"chat_id={msg.chat_id} type={msg.chat.type} thread={msg.message_thread_id} from={msg.from_user.full_name if msg.from_user else '?'} text={msg.text}")
async def main():
app = Application.builder().token(os.getenv("TELEGRAM_BOT_TOKEN")).build()
app.add_handler(MessageHandler(filters.ALL, on_any))
print("Listening for messages... Send something in the group.")
await app.initialize()
await app.start()
await app.updater.start_polling(drop_pending_updates=True)
await asyncio.Event().wait()
asyncio.run(main())

View File

@ -5,8 +5,8 @@ import json
import logging import logging
from typing import Callable, Awaitable from typing import Callable, Awaitable
import httpx
import websockets import websockets
from websockets.asyncio.client import connect
from config import Config from config import Config
@ -20,8 +20,9 @@ class TrackerClient:
self.config = config self.config = config
self.on_message = on_message self.on_message = on_message
self._ws = None self._ws = None
self._topic_map: dict[str, int] = {} # project_uuid -> topic_id
self._running = False self._running = False
self._project_chat_ids: dict[str, str] = {} # project_uuid -> chat_id
self._send_queue: asyncio.Queue = asyncio.Queue()
async def connect(self): async def connect(self):
"""Connect and authenticate.""" """Connect and authenticate."""
@ -30,16 +31,16 @@ class TrackerClient:
while self._running: while self._running:
try: try:
async with connect(url) as ws: async with websockets.connect(url) as ws:
self._ws = ws self._ws = ws
logger.info("Connected to Tracker WS") logger.info("Connected to Tracker WS")
async for raw in ws: # Run reader, writer and heartbeat concurrently
try: await asyncio.gather(
event = json.loads(raw) self._reader(ws),
await self._handle_event(event) self._writer(ws),
except json.JSONDecodeError: self._heartbeat(ws),
logger.warning("Invalid JSON from Tracker: %s", raw[:200]) )
except websockets.ConnectionClosed: except websockets.ConnectionClosed:
logger.warning("Tracker WS closed, reconnecting in 5s...") logger.warning("Tracker WS closed, reconnecting in 5s...")
await asyncio.sleep(5) await asyncio.sleep(5)
@ -47,34 +48,115 @@ class TrackerClient:
logger.error("Tracker WS error: %s, reconnecting in 5s...", e) logger.error("Tracker WS error: %s, reconnecting in 5s...", e)
await asyncio.sleep(5) await asyncio.sleep(5)
async def _heartbeat(self, ws):
"""Send periodic heartbeat to keep connection alive."""
while True:
await asyncio.sleep(25)
try:
await ws.send(json.dumps({"type": "heartbeat", "status": "online"}))
logger.debug("Heartbeat sent")
except Exception:
break
async def _reader(self, ws):
"""Read events from Tracker WS."""
async for raw in ws:
try:
event = json.loads(raw)
await self._handle_event(event)
except json.JSONDecodeError:
logger.warning("Invalid JSON from Tracker: %s", raw[:200])
async def _writer(self, ws):
"""Send queued messages to Tracker WS."""
while True:
payload = await self._send_queue.get()
try:
await ws.send(json.dumps(payload))
logger.info("WS sent: %s", payload.get("type"))
except Exception as e:
logger.error("WS send error: %s", e)
break
async def _handle_event(self, event: dict): async def _handle_event(self, event: dict):
"""Route Tracker events.""" """Route Tracker events."""
event_type = event.get("type", "") event_type = event.get("type", "")
if event_type == "auth.ok": if event_type == "auth.ok":
logger.info("Authenticated as bridge") logger.info("Authenticated as bridge")
await self._cache_projects()
return return
if event_type == "error": if event_type == "error":
logger.error("Tracker error: %s", event.get("message")) logger.error("Tracker error: %s", event.get("message"))
return return
# Forward message events to Telegram if event_type == "pong":
if event_type in ("message.new", "task.updated", "task.created"): return
logger.info("WS event: %s", event_type)
# Forward events to Telegram
if event_type in ("message.new", "task.updated", "task.created", "project.created"):
await self.on_message(event) await self.on_message(event)
async def send_message(self, project_uuid: str, text: str, author_name: str = None): async def _cache_projects(self):
"""Fetch all projects and cache their chat_ids."""
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.config.tracker_url}/api/v1/projects",
headers={"Authorization": f"Bearer {self.config.bridge_token}"},
)
if resp.status_code == 200:
for p in resp.json():
pid = p.get("id")
cid = p.get("chat_id")
if pid and cid:
self._project_chat_ids[pid] = cid
logger.info("Cached %d project chat_ids", len(self._project_chat_ids))
except Exception as e:
logger.error("Failed to cache projects: %s", e)
async def get_chat_id(self, project_uuid: str) -> str | None:
"""Get chat_id for a project, fetching if not cached."""
if project_uuid in self._project_chat_ids:
return self._project_chat_ids[project_uuid]
# Try to fetch
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.config.tracker_url}/api/v1/projects/{project_uuid}",
headers={"Authorization": f"Bearer {self.config.bridge_token}"},
)
if resp.status_code == 200:
cid = resp.json().get("chat_id")
if cid:
self._project_chat_ids[project_uuid] = cid
return cid
except Exception as e:
logger.error("Failed to fetch project %s: %s", project_uuid[:8], e)
return None
async def send_message(self, project_uuid: str, text: str):
"""Send a chat message to Tracker.""" """Send a chat message to Tracker."""
if not self._ws: if not self._ws:
logger.warning("Not connected to Tracker") logger.warning("Not connected to Tracker")
return return
chat_id = await self.get_chat_id(project_uuid)
if not chat_id:
logger.warning("No chat_id for project %s", project_uuid[:8])
return
payload = { payload = {
"type": "chat.send", "type": "chat.send",
"project_id": project_uuid, "chat_id": chat_id,
"text": text, "content": text,
} }
await self._ws.send(json.dumps(payload)) await self._send_queue.put(payload)
logger.info("Queued to Tracker: project=%s chat=%s", project_uuid[:8], chat_id[:8])
def stop(self): def stop(self):
self._running = False self._running = False