#!/usr/bin/env python3 """ Direct WebSocket test for Team Board Tracker. Usage: python ws_test.py [--url ws://localhost:8100/ws] [--token ] Without --token: creates a test agent via REST API first. Uses only stdlib + websockets (already in tracker venv). """ import asyncio import json import argparse import urllib.request import urllib.error from datetime import datetime import websockets BASE_URL = "http://localhost:8100" WS_URL = "ws://localhost:8100/ws" TEST_SLUG = "ws-test-bot" TEST_NAME = "WS Test Bot" def ts(): return datetime.now().strftime("%H:%M:%S.%f")[:-3] def log(tag, msg, data=None): extra = f" {json.dumps(data, ensure_ascii=False, indent=None)}" if data else "" print(f"[{ts()}] {tag:<12} {msg}{extra}") # ── REST helpers ────────────────────────────────────────────────────────────── API = "/api/v1" def http_get(path: str) -> tuple[int, dict]: try: with urllib.request.urlopen(f"{BASE_URL}{path}") as r: return r.status, json.loads(r.read()) except urllib.error.HTTPError as e: return e.code, {} def http_post(path: str, body: dict) -> tuple[int, dict]: data = json.dumps(body).encode() req = urllib.request.Request( f"{BASE_URL}{path}", data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req) as r: return r.status, json.loads(r.read()) except urllib.error.HTTPError as e: return e.code, json.loads(e.read()) if e.fp else {} def ensure_test_agent() -> str: """Create or get test agent, return token.""" code, data = http_get(f"{API}/members/{TEST_SLUG}") if code == 200: token = data.get("token") if token: log("SETUP", f"Found existing agent '{TEST_SLUG}'", {"token": token[:20] + "..."}) return token # Regenerate code2, d2 = http_post(f"{API}/members/{TEST_SLUG}/regenerate-token", {}) if code2 == 200: token = d2["token"] log("SETUP", "Regenerated token", {"token": token[:20] + "..."}) return token raise RuntimeError(f"Failed to regenerate token: {code2} {d2}") # Create payload = { "name": TEST_NAME, "slug": TEST_SLUG, "type": "agent", "role": "member", "agent_config": {"chat_listen": "all", "task_listen": "all"}, } code, data = http_post(f"{API}/members", payload) if code not in (200, 201): raise RuntimeError(f"Failed to create agent: {code} {data}") token = data["token"] log("SETUP", f"Created agent '{TEST_SLUG}'", {"token": token[:20] + "..."}) return token # ── WS test ─────────────────────────────────────────────────────────────────── async def run_test(ws_url: str, token: str): log("WS", f"Connecting → {ws_url}") async with websockets.connect(ws_url) as ws: log("WS", "Connected ✅") # ── 1. Auth ── auth = {"type": "auth", "token": token} log("SEND", "auth", auth) await ws.send(json.dumps(auth)) raw = await asyncio.wait_for(ws.recv(), timeout=5.0) msg = json.loads(raw) log("RECV", msg.get("type", "?"), msg) if msg.get("type") != "auth.ok": log("ERROR", f"Expected auth.ok, got '{msg.get('type')}'") return info = msg.get("data", {}) lobby_id = info.get("lobby_chat_id") projects = info.get("projects", []) online = info.get("online", []) log("INFO", f"online={online}, lobby={lobby_id}, projects={[p['name'] for p in projects]}") # ── 2. Heartbeat ── hb = {"type": "heartbeat", "status": "idle"} log("SEND", "heartbeat", hb) await ws.send(json.dumps(hb)) await asyncio.sleep(0.3) # ── 3. Subscribe to project ── if projects: p = projects[0] sub = {"type": "project.subscribe", "project_id": p["id"]} log("SEND", "project.subscribe", sub) await ws.send(json.dumps(sub)) await asyncio.sleep(0.3) # ── 4. Send chat message ── if lobby_id: chat_msg = { "type": "chat.send", "chat_id": lobby_id, "content": f"[ws_test.py] ping @ {ts()}", "mentions": [], } log("SEND", "chat.send", chat_msg) await ws.send(json.dumps(chat_msg)) else: log("WARN", "No lobby_chat_id — skipping chat.send") # ── 5. Drain incoming (3 s) ── log("INFO", "Listening 3 s for incoming events...") deadline = asyncio.get_event_loop().time() + 3.0 while asyncio.get_event_loop().time() < deadline: try: raw = await asyncio.wait_for(ws.recv(), timeout=0.4) incoming = json.loads(raw) log("RECV", incoming.get("type", "?"), incoming) except asyncio.TimeoutError: pass # ── 6. Unknown event → expect error ── bad = {"type": "totally.unknown", "foo": "bar"} log("SEND", "bad event", bad) await ws.send(json.dumps(bad)) try: raw = await asyncio.wait_for(ws.recv(), timeout=2.0) err = json.loads(raw) log("RECV", err.get("type", "?"), err) except asyncio.TimeoutError: log("WARN", "No response to unknown event") log("WS", "Test complete ✅") # ── main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser() parser.add_argument("--url", default=WS_URL) parser.add_argument("--token", default=None, help="Skip agent creation, use this token") args = parser.parse_args() if args.token: token = args.token log("SETUP", f"Using provided token: {token[:20]}...") else: log("SETUP", f"No token — creating/fetching test agent via {BASE_URL}") token = ensure_test_agent() asyncio.run(run_test(args.url, token)) if __name__ == "__main__": main()