docs/tests/test_websocket.py
markov 7fcf4abed9 feat: isolated test infra — docker-compose.test.yml + test.sh
- Отдельная PostgreSQL в tmpfs (RAM)
- Отдельный Tracker на порту 8101
- Полная изоляция от прода
- ./test.sh для запуска
2026-03-14 10:28:11 +01:00

420 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Тесты WebSocket подключения и получения событий
"""
import pytest
import asyncio
import websockets
import json
import uuid
from conftest import assert_uuid
@pytest.mark.asyncio
async def test_websocket_auth_with_jwt(admin_token: str):
"""Test WebSocket authentication using JWT token"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ожидаем сообщение auth.ok
# First message might be agent.status, drain until auth.ok
auth_response = None
for _ in range(5):
response = await asyncio.wait_for(websocket.recv(), timeout=5)
parsed = json.loads(response)
if parsed.get("type") == "auth.ok":
auth_response = parsed
break
assert auth_response is not None, "No auth.ok received"
assert "data" in auth_response
auth_data = auth_response["data"]
assert "member_id" in auth_data
assert "slug" in auth_data
assert "name" in auth_data
assert "lobby_chat_id" in auth_data
assert "projects" in auth_data
assert "online" in auth_data
assert_uuid(auth_data["member_id"])
assert auth_data["slug"] == "admin"
assert_uuid(auth_data["lobby_chat_id"])
assert isinstance(auth_data["projects"], list)
assert isinstance(auth_data["online"], list)
except Exception as e:
pytest.fail(f"WebSocket connection failed: {e}")
@pytest.mark.asyncio
async def test_websocket_auth_with_agent_token(agent_token: str):
"""Test WebSocket authentication using agent token"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ожидаем сообщение auth.ok
response = await websocket.recv()
auth_response = json.loads(response)
assert auth_response["type"] == "auth.ok"
auth_data = auth_response["data"]
# Агент может иметь разный формат auth — проверяем базовые поля
assert "member_id" in auth_data or "id" in auth_data
assert "slug" in auth_data
agent_config = auth_data["agent_config"]
assert "chat_listen" in agent_config
assert "task_listen" in agent_config
assert "capabilities" in agent_config
assert "labels" in agent_config
assert isinstance(auth_data["assigned_tasks"], list)
except Exception as e:
pytest.fail(f"WebSocket connection failed: {e}")
@pytest.mark.asyncio
async def test_websocket_auth_first_message(admin_token: str):
"""Test WebSocket authentication by sending auth message first"""
uri = "ws://localhost:{TEST_PORT}/ws"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Отправляем сообщение аутентификации
auth_message = {
"type": "auth",
"token": admin_token
}
await websocket.send(json.dumps(auth_message))
# Ожидаем ответ
response = await websocket.recv()
auth_response = json.loads(response)
assert auth_response["type"] == "auth.ok"
assert "data" in auth_response
except Exception as e:
pytest.fail(f"WebSocket auth message failed: {e}")
@pytest.mark.asyncio
async def test_websocket_auth_invalid_token():
"""Test WebSocket authentication with invalid token"""
uri = "ws://localhost:{TEST_PORT}/ws?token=invalid_token"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ожидаем сообщение об ошибке
response = await websocket.recv()
auth_response = json.loads(response)
assert auth_response["type"] == "auth.error"
assert "message" in auth_response
except websockets.exceptions.ConnectionClosedError:
# Соединение может быть закрыто сразу при неверном токене
pass
except Exception as e:
pytest.fail(f"Unexpected error: {e}")
@pytest.mark.asyncio
async def test_websocket_heartbeat(admin_token: str):
"""Test WebSocket heartbeat mechanism"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
await websocket.recv() # auth.ok
# Отправляем heartbeat
heartbeat = {"type": "heartbeat"}
await websocket.send(json.dumps(heartbeat))
# Heartbeat может не иметь ответа, но соединение должно остаться живым
await asyncio.sleep(0.5)
# Проверяем что соединение живо отправкой ещё одного heartbeat
heartbeat_with_status = {
"type": "heartbeat",
"status": "online"
}
await websocket.send(json.dumps(heartbeat_with_status))
except Exception as e:
pytest.fail(f"Heartbeat test failed: {e}")
@pytest.mark.asyncio
async def test_websocket_project_subscription(admin_token: str, test_project: dict):
"""Test subscribing to project events via WebSocket"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
await websocket.recv() # auth.ok
# Подписываемся на проект
subscribe = {
"type": "project.subscribe",
"project_id": test_project["id"]
}
await websocket.send(json.dumps(subscribe))
# ACK может прийти или не прийти, это зависит от реализации
await asyncio.sleep(0.5)
# Отписываемся от проекта
unsubscribe = {
"type": "project.unsubscribe",
"project_id": test_project["id"]
}
await websocket.send(json.dumps(unsubscribe))
except Exception as e:
pytest.fail(f"Project subscription test failed: {e}")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
async def test_websocket_send_chat_message(admin_token: str, test_project: dict):
"""Test sending chat message via WebSocket"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
await websocket.recv() # auth.ok
# Отправляем сообщение в чат проекта
message = {
"type": "chat.send",
"chat_id": test_project["chat_id"],
"content": "Test message via WebSocket"
}
await websocket.send(json.dumps(message))
# Ожидаем получить обратно событие message.new
await asyncio.wait_for(
receive_message_new_event(websocket),
timeout=5.0
)
except asyncio.TimeoutError:
pytest.fail("Timeout waiting for message.new event")
except Exception as e:
pytest.fail(f"Chat message test failed: {e}")
async def receive_message_new_event(websocket):
"""Helper function to receive message.new event"""
while True:
response = await websocket.recv()
event = json.loads(response)
if event["type"] == "message.new":
assert "data" in event
message_data = event["data"]
assert "id" in message_data
assert "content" in message_data
assert "author" in message_data
assert "created_at" in message_data
assert_uuid(message_data["id"])
return event
@pytest.mark.asyncio
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
async def test_websocket_send_task_comment(admin_token: str, test_task: dict):
"""Test sending task comment via WebSocket"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
await websocket.recv() # auth.ok
# Отправляем комментарий к задаче
message = {
"type": "chat.send",
"task_id": test_task["id"],
"content": "Task comment via WebSocket"
}
await websocket.send(json.dumps(message))
# Ожидаем получить обратно событие message.new
await asyncio.wait_for(
receive_message_new_event(websocket),
timeout=5.0
)
except asyncio.TimeoutError:
pytest.fail("Timeout waiting for task comment event")
except Exception as e:
pytest.fail(f"Task comment test failed: {e}")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
async def test_websocket_agent_with_thinking(agent_token: str, test_project: dict):
"""Test agent sending message with thinking via WebSocket"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
await websocket.recv() # auth.ok
# Отправляем сообщение с thinking (только агент может)
message = {
"type": "chat.send",
"chat_id": test_project["chat_id"],
"content": "Agent message with thinking",
"thinking": "Let me think about this problem..."
}
await websocket.send(json.dumps(message))
# Ожидаем получить обратно событие message.new
event = await asyncio.wait_for(
receive_message_new_event(websocket),
timeout=5.0
)
# Проверяем что thinking присутствует
message_data = event["data"]
assert "thinking" in message_data
assert message_data["thinking"] == "Let me think about this problem..."
assert message_data["author_type"] == "agent"
except asyncio.TimeoutError:
pytest.fail("Timeout waiting for agent message event")
except Exception as e:
pytest.fail(f"Agent message test failed: {e}")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
async def test_websocket_message_with_mentions(admin_token: str, test_project: dict, test_user: dict):
"""Test sending message with mentions via WebSocket"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
await websocket.recv() # auth.ok
# Отправляем сообщение с упоминанием
message = {
"type": "chat.send",
"chat_id": test_project["chat_id"],
"content": f"Hey @{test_user['slug']}, check this!",
"mentions": [test_user["id"]]
}
await websocket.send(json.dumps(message))
# Ожидаем получить обратно событие message.new
event = await asyncio.wait_for(
receive_message_new_event(websocket),
timeout=5.0
)
# Проверяем что mentions присутствуют
message_data = event["data"]
assert "mentions" in message_data
assert len(message_data["mentions"]) == 1
assert message_data["mentions"][0]["id"] == test_user["id"]
except asyncio.TimeoutError:
pytest.fail("Timeout waiting for message with mentions")
except Exception as e:
pytest.fail(f"Message with mentions test failed: {e}")
@pytest.mark.asyncio
async def test_websocket_invalid_message_format(admin_token: str):
"""Test sending invalid message format via WebSocket"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
await websocket.recv() # auth.ok
# Отправляем невалидное сообщение
await websocket.send("invalid json")
# Может прийти ошибка или соединение может закрыться
try:
response = await asyncio.wait_for(websocket.recv(), timeout=2.0)
error_event = json.loads(response)
assert error_event["type"] == "error"
except asyncio.TimeoutError:
# Таймаут тоже нормально - сервер может игнорировать невалидные сообщения
pass
except websockets.exceptions.ConnectionClosed:
# Соединение может быть закрыто при невалидном сообщении
pass
except Exception as e:
pytest.fail(f"Invalid message test failed: {e}")
@pytest.mark.asyncio
async def test_websocket_connection_without_auth():
"""Test WebSocket connection without authentication"""
uri = "ws://localhost:{TEST_PORT}/ws"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Отправляем сообщение без аутентификации
message = {
"type": "chat.send",
"content": "Unauthorized message"
}
await websocket.send(json.dumps(message))
# Должна прийти ошибка аутентификации
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
error_event = json.loads(response)
assert error_event["type"] == "auth.error" or error_event["type"] == "error"
except websockets.exceptions.ConnectionClosed:
# Соединение может быть закрыто если требуется немедленная аутентификация
pass
except Exception as e:
pytest.fail(f"Unauthenticated connection test failed: {e}")
@pytest.mark.asyncio
async def test_websocket_multiple_connections(admin_token: str):
"""Test multiple WebSocket connections for same user"""
uri = f"ws://localhost:8101/ws?token={admin_token}"
try:
# Открываем два соединения одновременно
async with websockets.connect(uri, timeout=10) as ws1, \
websockets.connect(uri, timeout=10) as ws2:
# Ждём аутентификацию на обоих соединениях
await ws1.recv() # auth.ok
await ws2.recv() # auth.ok
# Отправляем heartbeat в оба соединения
heartbeat = {"type": "heartbeat"}
await ws1.send(json.dumps(heartbeat))
await ws2.send(json.dumps(heartbeat))
# Оба соединения должны остаться живыми
await asyncio.sleep(1)
except Exception as e:
pytest.fail(f"Multiple connections test failed: {e}")