""" Тесты WebSocket подключения и получения событий """ import pytest import asyncio import websockets import json import uuid from conftest import assert_uuid @pytest.mark.asyncio async def test_websocket_auth_with_jwt(admin_token: str): """Test WebSocket authentication using JWT token""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ожидаем сообщение auth.ok # First message might be agent.status, drain until auth.ok auth_response = None for _ in range(5): response = await asyncio.wait_for(websocket.recv(), timeout=5) parsed = json.loads(response) if parsed.get("type") == "auth.ok": auth_response = parsed break assert auth_response is not None, "No auth.ok received" assert "data" in auth_response auth_data = auth_response["data"] assert "member_id" in auth_data assert "slug" in auth_data assert "name" in auth_data assert "lobby_chat_id" in auth_data assert "projects" in auth_data assert "online" in auth_data assert_uuid(auth_data["member_id"]) assert auth_data["slug"] == "admin" assert_uuid(auth_data["lobby_chat_id"]) assert isinstance(auth_data["projects"], list) assert isinstance(auth_data["online"], list) except Exception as e: pytest.fail(f"WebSocket connection failed: {e}") @pytest.mark.asyncio async def test_websocket_auth_with_agent_token(agent_token: str): """Test WebSocket authentication using agent token""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ожидаем сообщение auth.ok response = await websocket.recv() auth_response = json.loads(response) assert auth_response["type"] == "auth.ok" auth_data = auth_response["data"] # Агент может иметь разный формат auth — проверяем базовые поля assert "member_id" in auth_data or "id" in auth_data assert "slug" in auth_data agent_config = auth_data["agent_config"] assert "chat_listen" in agent_config assert "task_listen" in agent_config assert "capabilities" in agent_config assert "labels" in agent_config assert isinstance(auth_data["assigned_tasks"], list) except Exception as e: pytest.fail(f"WebSocket connection failed: {e}") @pytest.mark.asyncio async def test_websocket_auth_first_message(admin_token: str): """Test WebSocket authentication by sending auth message first""" uri = "ws://localhost:{TEST_PORT}/ws" try: async with websockets.connect(uri, timeout=10) as websocket: # Отправляем сообщение аутентификации auth_message = { "type": "auth", "token": admin_token } await websocket.send(json.dumps(auth_message)) # Ожидаем ответ response = await websocket.recv() auth_response = json.loads(response) assert auth_response["type"] == "auth.ok" assert "data" in auth_response except Exception as e: pytest.fail(f"WebSocket auth message failed: {e}") @pytest.mark.asyncio async def test_websocket_auth_invalid_token(): """Test WebSocket authentication with invalid token""" uri = "ws://localhost:{TEST_PORT}/ws?token=invalid_token" try: async with websockets.connect(uri, timeout=10) as websocket: # Ожидаем сообщение об ошибке response = await websocket.recv() auth_response = json.loads(response) assert auth_response["type"] == "auth.error" assert "message" in auth_response except websockets.exceptions.ConnectionClosedError: # Соединение может быть закрыто сразу при неверном токене pass except Exception as e: pytest.fail(f"Unexpected error: {e}") @pytest.mark.asyncio async def test_websocket_heartbeat(admin_token: str): """Test WebSocket heartbeat mechanism""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию await websocket.recv() # auth.ok # Отправляем heartbeat heartbeat = {"type": "heartbeat"} await websocket.send(json.dumps(heartbeat)) # Heartbeat может не иметь ответа, но соединение должно остаться живым await asyncio.sleep(0.5) # Проверяем что соединение живо отправкой ещё одного heartbeat heartbeat_with_status = { "type": "heartbeat", "status": "online" } await websocket.send(json.dumps(heartbeat_with_status)) except Exception as e: pytest.fail(f"Heartbeat test failed: {e}") @pytest.mark.asyncio async def test_websocket_project_subscription(admin_token: str, test_project: dict): """Test subscribing to project events via WebSocket""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию await websocket.recv() # auth.ok # Подписываемся на проект subscribe = { "type": "project.subscribe", "project_id": test_project["id"] } await websocket.send(json.dumps(subscribe)) # ACK может прийти или не прийти, это зависит от реализации await asyncio.sleep(0.5) # Отписываемся от проекта unsubscribe = { "type": "project.unsubscribe", "project_id": test_project["id"] } await websocket.send(json.dumps(unsubscribe)) except Exception as e: pytest.fail(f"Project subscription test failed: {e}") @pytest.mark.asyncio @pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients") async def test_websocket_send_chat_message(admin_token: str, test_project: dict): """Test sending chat message via WebSocket""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию await websocket.recv() # auth.ok # Отправляем сообщение в чат проекта message = { "type": "chat.send", "chat_id": test_project["chat_id"], "content": "Test message via WebSocket" } await websocket.send(json.dumps(message)) # Ожидаем получить обратно событие message.new await asyncio.wait_for( receive_message_new_event(websocket), timeout=5.0 ) except asyncio.TimeoutError: pytest.fail("Timeout waiting for message.new event") except Exception as e: pytest.fail(f"Chat message test failed: {e}") async def receive_message_new_event(websocket): """Helper function to receive message.new event""" while True: response = await websocket.recv() event = json.loads(response) if event["type"] == "message.new": assert "data" in event message_data = event["data"] assert "id" in message_data assert "content" in message_data assert "author" in message_data assert "created_at" in message_data assert_uuid(message_data["id"]) return event @pytest.mark.asyncio @pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients") async def test_websocket_send_task_comment(admin_token: str, test_task: dict): """Test sending task comment via WebSocket""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию await websocket.recv() # auth.ok # Отправляем комментарий к задаче message = { "type": "chat.send", "task_id": test_task["id"], "content": "Task comment via WebSocket" } await websocket.send(json.dumps(message)) # Ожидаем получить обратно событие message.new await asyncio.wait_for( receive_message_new_event(websocket), timeout=5.0 ) except asyncio.TimeoutError: pytest.fail("Timeout waiting for task comment event") except Exception as e: pytest.fail(f"Task comment test failed: {e}") @pytest.mark.asyncio @pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients") async def test_websocket_agent_with_thinking(agent_token: str, test_project: dict): """Test agent sending message with thinking via WebSocket""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию await websocket.recv() # auth.ok # Отправляем сообщение с thinking (только агент может) message = { "type": "chat.send", "chat_id": test_project["chat_id"], "content": "Agent message with thinking", "thinking": "Let me think about this problem..." } await websocket.send(json.dumps(message)) # Ожидаем получить обратно событие message.new event = await asyncio.wait_for( receive_message_new_event(websocket), timeout=5.0 ) # Проверяем что thinking присутствует message_data = event["data"] assert "thinking" in message_data assert message_data["thinking"] == "Let me think about this problem..." assert message_data["author_type"] == "agent" except asyncio.TimeoutError: pytest.fail("Timeout waiting for agent message event") except Exception as e: pytest.fail(f"Agent message test failed: {e}") @pytest.mark.asyncio @pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients") async def test_websocket_message_with_mentions(admin_token: str, test_project: dict, test_user: dict): """Test sending message with mentions via WebSocket""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию await websocket.recv() # auth.ok # Отправляем сообщение с упоминанием message = { "type": "chat.send", "chat_id": test_project["chat_id"], "content": f"Hey @{test_user['slug']}, check this!", "mentions": [test_user["id"]] } await websocket.send(json.dumps(message)) # Ожидаем получить обратно событие message.new event = await asyncio.wait_for( receive_message_new_event(websocket), timeout=5.0 ) # Проверяем что mentions присутствуют message_data = event["data"] assert "mentions" in message_data assert len(message_data["mentions"]) == 1 assert message_data["mentions"][0]["id"] == test_user["id"] except asyncio.TimeoutError: pytest.fail("Timeout waiting for message with mentions") except Exception as e: pytest.fail(f"Message with mentions test failed: {e}") @pytest.mark.asyncio async def test_websocket_invalid_message_format(admin_token: str): """Test sending invalid message format via WebSocket""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию await websocket.recv() # auth.ok # Отправляем невалидное сообщение await websocket.send("invalid json") # Может прийти ошибка или соединение может закрыться try: response = await asyncio.wait_for(websocket.recv(), timeout=2.0) error_event = json.loads(response) assert error_event["type"] == "error" except asyncio.TimeoutError: # Таймаут тоже нормально - сервер может игнорировать невалидные сообщения pass except websockets.exceptions.ConnectionClosed: # Соединение может быть закрыто при невалидном сообщении pass except Exception as e: pytest.fail(f"Invalid message test failed: {e}") @pytest.mark.asyncio async def test_websocket_connection_without_auth(): """Test WebSocket connection without authentication""" uri = "ws://localhost:{TEST_PORT}/ws" try: async with websockets.connect(uri, timeout=10) as websocket: # Отправляем сообщение без аутентификации message = { "type": "chat.send", "content": "Unauthorized message" } await websocket.send(json.dumps(message)) # Должна прийти ошибка аутентификации response = await asyncio.wait_for(websocket.recv(), timeout=5.0) error_event = json.loads(response) assert error_event["type"] == "auth.error" or error_event["type"] == "error" except websockets.exceptions.ConnectionClosed: # Соединение может быть закрыто если требуется немедленная аутентификация pass except Exception as e: pytest.fail(f"Unauthenticated connection test failed: {e}") @pytest.mark.asyncio async def test_websocket_multiple_connections(admin_token: str): """Test multiple WebSocket connections for same user""" uri = f"ws://localhost:8101/ws?token={admin_token}" try: # Открываем два соединения одновременно async with websockets.connect(uri, timeout=10) as ws1, \ websockets.connect(uri, timeout=10) as ws2: # Ждём аутентификацию на обоих соединениях await ws1.recv() # auth.ok await ws2.recv() # auth.ok # Отправляем heartbeat в оба соединения heartbeat = {"type": "heartbeat"} await ws1.send(json.dumps(heartbeat)) await ws2.send(json.dumps(heartbeat)) # Оба соединения должны остаться живыми await asyncio.sleep(1) except Exception as e: pytest.fail(f"Multiple connections test failed: {e}")