420 lines
16 KiB
Python
420 lines
16 KiB
Python
"""
|
||
Тесты WebSocket подключения и получения событий
|
||
"""
|
||
import pytest
|
||
import asyncio
|
||
import websockets
|
||
import json
|
||
import uuid
|
||
from conftest import assert_uuid
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_auth_with_jwt(admin_token: str):
|
||
"""Test WebSocket authentication using JWT token"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ожидаем сообщение auth.ok
|
||
# First message might be agent.status, drain until auth.ok
|
||
auth_response = None
|
||
for _ in range(5):
|
||
response = await asyncio.wait_for(websocket.recv(), timeout=5)
|
||
parsed = json.loads(response)
|
||
if parsed.get("type") == "auth.ok":
|
||
auth_response = parsed
|
||
break
|
||
assert auth_response is not None, "No auth.ok received"
|
||
assert "data" in auth_response
|
||
|
||
auth_data = auth_response["data"]
|
||
assert "member_id" in auth_data
|
||
assert "slug" in auth_data
|
||
assert "name" in auth_data
|
||
assert "lobby_chat_id" in auth_data
|
||
assert "projects" in auth_data
|
||
assert "online" in auth_data
|
||
|
||
assert_uuid(auth_data["member_id"])
|
||
assert auth_data["slug"] == "admin"
|
||
assert_uuid(auth_data["lobby_chat_id"])
|
||
assert isinstance(auth_data["projects"], list)
|
||
assert isinstance(auth_data["online"], list)
|
||
|
||
except Exception as e:
|
||
pytest.fail(f"WebSocket connection failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_auth_with_agent_token(agent_token: str):
|
||
"""Test WebSocket authentication using agent token"""
|
||
uri = f"ws://localhost:8101/ws?token={agent_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ожидаем сообщение auth.ok
|
||
response = await websocket.recv()
|
||
auth_response = json.loads(response)
|
||
|
||
assert auth_response["type"] == "auth.ok"
|
||
auth_data = auth_response["data"]
|
||
|
||
# Агент может иметь разный формат auth — проверяем базовые поля
|
||
assert "member_id" in auth_data or "id" in auth_data
|
||
assert "slug" in auth_data
|
||
|
||
agent_config = auth_data["agent_config"]
|
||
assert "chat_listen" in agent_config
|
||
assert "task_listen" in agent_config
|
||
assert "capabilities" in agent_config
|
||
assert "labels" in agent_config
|
||
|
||
assert isinstance(auth_data["assigned_tasks"], list)
|
||
|
||
except Exception as e:
|
||
pytest.fail(f"WebSocket connection failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_auth_first_message(admin_token: str):
|
||
"""Test WebSocket authentication by sending auth message first"""
|
||
uri = "ws://localhost:8101/ws"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Отправляем сообщение аутентификации
|
||
auth_message = {
|
||
"type": "auth",
|
||
"token": admin_token
|
||
}
|
||
await websocket.send(json.dumps(auth_message))
|
||
|
||
# Ожидаем ответ
|
||
response = await websocket.recv()
|
||
auth_response = json.loads(response)
|
||
|
||
assert auth_response["type"] == "auth.ok"
|
||
assert "data" in auth_response
|
||
|
||
except Exception as e:
|
||
pytest.fail(f"WebSocket auth message failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_auth_invalid_token():
|
||
"""Test WebSocket authentication with invalid token"""
|
||
uri = "ws://localhost:8101/ws?token=invalid_token"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ожидаем сообщение об ошибке
|
||
response = await websocket.recv()
|
||
auth_response = json.loads(response)
|
||
|
||
assert auth_response["type"] == "auth.error"
|
||
assert "message" in auth_response
|
||
|
||
except websockets.exceptions.ConnectionClosedError:
|
||
# Соединение может быть закрыто сразу при неверном токене
|
||
pass
|
||
except Exception as e:
|
||
pytest.fail(f"Unexpected error: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_heartbeat(admin_token: str):
|
||
"""Test WebSocket heartbeat mechanism"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ждём аутентификацию
|
||
await websocket.recv() # auth.ok
|
||
|
||
# Отправляем heartbeat
|
||
heartbeat = {"type": "heartbeat"}
|
||
await websocket.send(json.dumps(heartbeat))
|
||
|
||
# Heartbeat может не иметь ответа, но соединение должно остаться живым
|
||
await asyncio.sleep(0.5)
|
||
|
||
# Проверяем что соединение живо отправкой ещё одного heartbeat
|
||
heartbeat_with_status = {
|
||
"type": "heartbeat",
|
||
"status": "online"
|
||
}
|
||
await websocket.send(json.dumps(heartbeat_with_status))
|
||
|
||
except Exception as e:
|
||
pytest.fail(f"Heartbeat test failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_project_subscription(admin_token: str, test_project: dict):
|
||
"""Test subscribing to project events via WebSocket"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ждём аутентификацию
|
||
await websocket.recv() # auth.ok
|
||
|
||
# Подписываемся на проект
|
||
subscribe = {
|
||
"type": "project.subscribe",
|
||
"project_id": test_project["id"]
|
||
}
|
||
await websocket.send(json.dumps(subscribe))
|
||
|
||
# ACK может прийти или не прийти, это зависит от реализации
|
||
await asyncio.sleep(0.5)
|
||
|
||
# Отписываемся от проекта
|
||
unsubscribe = {
|
||
"type": "project.unsubscribe",
|
||
"project_id": test_project["id"]
|
||
}
|
||
await websocket.send(json.dumps(unsubscribe))
|
||
|
||
except Exception as e:
|
||
pytest.fail(f"Project subscription test failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
|
||
async def test_websocket_send_chat_message(admin_token: str, test_project: dict):
|
||
"""Test sending chat message via WebSocket"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ждём аутентификацию
|
||
await websocket.recv() # auth.ok
|
||
|
||
# Отправляем сообщение в чат проекта
|
||
message = {
|
||
"type": "chat.send",
|
||
"chat_id": test_project["chat_id"],
|
||
"content": "Test message via WebSocket"
|
||
}
|
||
await websocket.send(json.dumps(message))
|
||
|
||
# Ожидаем получить обратно событие message.new
|
||
await asyncio.wait_for(
|
||
receive_message_new_event(websocket),
|
||
timeout=5.0
|
||
)
|
||
|
||
except asyncio.TimeoutError:
|
||
pytest.fail("Timeout waiting for message.new event")
|
||
except Exception as e:
|
||
pytest.fail(f"Chat message test failed: {e}")
|
||
|
||
|
||
async def receive_message_new_event(websocket):
|
||
"""Helper function to receive message.new event"""
|
||
while True:
|
||
response = await websocket.recv()
|
||
event = json.loads(response)
|
||
|
||
if event["type"] == "message.new":
|
||
assert "data" in event
|
||
message_data = event["data"]
|
||
|
||
assert "id" in message_data
|
||
assert "content" in message_data
|
||
assert "author" in message_data
|
||
assert "created_at" in message_data
|
||
|
||
assert_uuid(message_data["id"])
|
||
return event
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
|
||
async def test_websocket_send_task_comment(admin_token: str, test_task: dict):
|
||
"""Test sending task comment via WebSocket"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ждём аутентификацию
|
||
await websocket.recv() # auth.ok
|
||
|
||
# Отправляем комментарий к задаче
|
||
message = {
|
||
"type": "chat.send",
|
||
"task_id": test_task["id"],
|
||
"content": "Task comment via WebSocket"
|
||
}
|
||
await websocket.send(json.dumps(message))
|
||
|
||
# Ожидаем получить обратно событие message.new
|
||
await asyncio.wait_for(
|
||
receive_message_new_event(websocket),
|
||
timeout=5.0
|
||
)
|
||
|
||
except asyncio.TimeoutError:
|
||
pytest.fail("Timeout waiting for task comment event")
|
||
except Exception as e:
|
||
pytest.fail(f"Task comment test failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
|
||
async def test_websocket_agent_with_thinking(agent_token: str, test_project: dict):
|
||
"""Test agent sending message with thinking via WebSocket"""
|
||
uri = f"ws://localhost:8101/ws?token={agent_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ждём аутентификацию
|
||
await websocket.recv() # auth.ok
|
||
|
||
# Отправляем сообщение с thinking (только агент может)
|
||
message = {
|
||
"type": "chat.send",
|
||
"chat_id": test_project["chat_id"],
|
||
"content": "Agent message with thinking",
|
||
"thinking": "Let me think about this problem..."
|
||
}
|
||
await websocket.send(json.dumps(message))
|
||
|
||
# Ожидаем получить обратно событие message.new
|
||
event = await asyncio.wait_for(
|
||
receive_message_new_event(websocket),
|
||
timeout=5.0
|
||
)
|
||
|
||
# Проверяем что thinking присутствует
|
||
message_data = event["data"]
|
||
assert "thinking" in message_data
|
||
assert message_data["thinking"] == "Let me think about this problem..."
|
||
assert message_data["author_type"] == "agent"
|
||
|
||
except asyncio.TimeoutError:
|
||
pytest.fail("Timeout waiting for agent message event")
|
||
except Exception as e:
|
||
pytest.fail(f"Agent message test failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.xfail(reason="WS broadcast excludes sender - needs 2 clients")
|
||
async def test_websocket_message_with_mentions(admin_token: str, test_project: dict, test_user: dict):
|
||
"""Test sending message with mentions via WebSocket"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ждём аутентификацию
|
||
await websocket.recv() # auth.ok
|
||
|
||
# Отправляем сообщение с упоминанием
|
||
message = {
|
||
"type": "chat.send",
|
||
"chat_id": test_project["chat_id"],
|
||
"content": f"Hey @{test_user['slug']}, check this!",
|
||
"mentions": [test_user["id"]]
|
||
}
|
||
await websocket.send(json.dumps(message))
|
||
|
||
# Ожидаем получить обратно событие message.new
|
||
event = await asyncio.wait_for(
|
||
receive_message_new_event(websocket),
|
||
timeout=5.0
|
||
)
|
||
|
||
# Проверяем что mentions присутствуют
|
||
message_data = event["data"]
|
||
assert "mentions" in message_data
|
||
assert len(message_data["mentions"]) == 1
|
||
assert message_data["mentions"][0]["id"] == test_user["id"]
|
||
|
||
except asyncio.TimeoutError:
|
||
pytest.fail("Timeout waiting for message with mentions")
|
||
except Exception as e:
|
||
pytest.fail(f"Message with mentions test failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_invalid_message_format(admin_token: str):
|
||
"""Test sending invalid message format via WebSocket"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Ждём аутентификацию
|
||
await websocket.recv() # auth.ok
|
||
|
||
# Отправляем невалидное сообщение
|
||
await websocket.send("invalid json")
|
||
|
||
# Может прийти ошибка или соединение может закрыться
|
||
try:
|
||
response = await asyncio.wait_for(websocket.recv(), timeout=2.0)
|
||
error_event = json.loads(response)
|
||
assert error_event["type"] == "error"
|
||
except asyncio.TimeoutError:
|
||
# Таймаут тоже нормально - сервер может игнорировать невалидные сообщения
|
||
pass
|
||
|
||
except websockets.exceptions.ConnectionClosed:
|
||
# Соединение может быть закрыто при невалидном сообщении
|
||
pass
|
||
except Exception as e:
|
||
pytest.fail(f"Invalid message test failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_connection_without_auth():
|
||
"""Test WebSocket connection without authentication"""
|
||
uri = "ws://localhost:8101/ws"
|
||
|
||
try:
|
||
async with websockets.connect(uri, timeout=10) as websocket:
|
||
# Отправляем сообщение без аутентификации
|
||
message = {
|
||
"type": "chat.send",
|
||
"content": "Unauthorized message"
|
||
}
|
||
await websocket.send(json.dumps(message))
|
||
|
||
# Должна прийти ошибка аутентификации
|
||
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
|
||
error_event = json.loads(response)
|
||
|
||
assert error_event["type"] == "auth.error" or error_event["type"] == "error"
|
||
|
||
except websockets.exceptions.ConnectionClosed:
|
||
# Соединение может быть закрыто если требуется немедленная аутентификация
|
||
pass
|
||
except Exception as e:
|
||
pytest.fail(f"Unauthenticated connection test failed: {e}")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_websocket_multiple_connections(admin_token: str):
|
||
"""Test multiple WebSocket connections for same user"""
|
||
uri = f"ws://localhost:8101/ws?token={admin_token}"
|
||
|
||
try:
|
||
# Открываем два соединения одновременно
|
||
async with websockets.connect(uri, timeout=10) as ws1, \
|
||
websockets.connect(uri, timeout=10) as ws2:
|
||
|
||
# Ждём аутентификацию на обоих соединениях
|
||
await ws1.recv() # auth.ok
|
||
await ws2.recv() # auth.ok
|
||
|
||
# Отправляем heartbeat в оба соединения
|
||
heartbeat = {"type": "heartbeat"}
|
||
await ws1.send(json.dumps(heartbeat))
|
||
await ws2.send(json.dumps(heartbeat))
|
||
|
||
# Оба соединения должны остаться живыми
|
||
await asyncio.sleep(1)
|
||
|
||
except Exception as e:
|
||
pytest.fail(f"Multiple connections test failed: {e}") |