""" Тесты агентного стриминга - WebSocket события stream.start/delta/tool/end """ import pytest import asyncio import websockets import json import uuid from conftest import assert_uuid @pytest.mark.asyncio @pytest.mark.xfail(reason="Streaming tests need running agent + dual WS") async def test_agent_stream_events_structure(agent_token: str, test_project: dict): """Test structure of agent streaming events""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию auth_response = await websocket.recv() auth_data = json.loads(auth_response) assert auth_data["type"] == "auth.ok" agent_id = auth_data["data"]["member_id"] agent_slug = auth_data["data"]["slug"] # Симулируем события стриминга (обычно отправляются сервером, не клиентом) # Но проверяем структуру событий которые должны прийти stream_id = str(uuid.uuid4()) # 1. agent.stream.start start_event = { "type": "agent.stream.start", "data": { "stream_id": stream_id, "project_id": test_project["id"], "chat_id": test_project["chat_id"], "task_id": None, "agent_id": agent_id, "agent_slug": agent_slug } } # 2. agent.stream.delta delta_event = { "type": "agent.stream.delta", "data": { "stream_id": stream_id, "delta": "Hello, I'm thinking about this task...", "agent_id": agent_id, "agent_slug": agent_slug } } # 3. agent.stream.tool tool_event = { "type": "agent.stream.tool", "data": { "stream_id": stream_id, "tool_name": "web_search", "tool_args": {"query": "python testing best practices"}, "tool_result": {"results": ["result1", "result2"]}, "agent_id": agent_id, "agent_slug": agent_slug } } # 4. agent.stream.end end_event = { "type": "agent.stream.end", "data": { "stream_id": stream_id, "final_message": "I've completed the analysis and here are my findings...", "tool_log": [ { "name": "web_search", "args": {"query": "python testing best practices"}, "result": {"results": ["result1", "result2"]}, "error": None } ], "agent_id": agent_id, "agent_slug": agent_slug } } # Проверяем структуру событий assert_stream_start_structure(start_event) assert_stream_delta_structure(delta_event) assert_stream_tool_structure(tool_event) assert_stream_end_structure(end_event) except Exception as e: pytest.fail(f"Stream events structure test failed: {e}") def assert_stream_start_structure(event): """Validate agent.stream.start event structure""" assert event["type"] == "agent.stream.start" assert "data" in event data = event["data"] assert "stream_id" in data assert "project_id" in data assert "chat_id" in data assert "task_id" in data # может быть None assert "agent_id" in data assert "agent_slug" in data assert_uuid(data["stream_id"]) assert_uuid(data["project_id"]) assert_uuid(data["agent_id"]) assert isinstance(data["agent_slug"], str) if data["chat_id"]: assert_uuid(data["chat_id"]) if data["task_id"]: assert_uuid(data["task_id"]) def assert_stream_delta_structure(event): """Validate agent.stream.delta event structure""" assert event["type"] == "agent.stream.delta" assert "data" in event data = event["data"] assert "stream_id" in data assert "delta" in data assert "agent_id" in data assert "agent_slug" in data assert_uuid(data["stream_id"]) assert_uuid(data["agent_id"]) assert isinstance(data["delta"], str) assert isinstance(data["agent_slug"], str) def assert_stream_tool_structure(event): """Validate agent.stream.tool event structure""" assert event["type"] == "agent.stream.tool" assert "data" in event data = event["data"] assert "stream_id" in data assert "tool_name" in data assert "tool_args" in data assert "tool_result" in data assert "agent_id" in data assert "agent_slug" in data assert_uuid(data["stream_id"]) assert_uuid(data["agent_id"]) assert isinstance(data["tool_name"], str) assert isinstance(data["agent_slug"], str) # tool_args и tool_result могут быть любыми JSON объектами assert data["tool_args"] is not None assert data["tool_result"] is not None def assert_stream_end_structure(event): """Validate agent.stream.end event structure""" assert event["type"] == "agent.stream.end" assert "data" in event data = event["data"] assert "stream_id" in data assert "final_message" in data assert "tool_log" in data assert "agent_id" in data assert "agent_slug" in data assert_uuid(data["stream_id"]) assert_uuid(data["agent_id"]) assert isinstance(data["final_message"], str) assert isinstance(data["tool_log"], list) assert isinstance(data["agent_slug"], str) # Проверяем структуру tool_log for tool_entry in data["tool_log"]: assert "name" in tool_entry assert "args" in tool_entry assert "result" in tool_entry # "error" может отсутствовать или быть None @pytest.mark.asyncio @pytest.mark.xfail(reason="Streaming tests need running agent + dual WS") async def test_agent_stream_task_context(agent_token: str, test_task: dict): """Test agent streaming in context of specific task""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию auth_response = await websocket.recv() auth_data = json.loads(auth_response) assert auth_data["type"] == "auth.ok" agent_id = auth_data["data"]["member_id"] agent_slug = auth_data["data"]["slug"] stream_id = str(uuid.uuid4()) # Стриминг в контексте задачи start_event = { "type": "agent.stream.start", "data": { "stream_id": stream_id, "project_id": test_task["project"]["id"], "chat_id": None, # Комментарий к задаче, не чат "task_id": test_task["id"], "agent_id": agent_id, "agent_slug": agent_slug } } assert_stream_start_structure(start_event) # Проверяем что task_id корректно указан assert start_event["data"]["task_id"] == test_task["id"] assert start_event["data"]["chat_id"] is None except Exception as e: pytest.fail(f"Stream task context test failed: {e}") @pytest.mark.asyncio @pytest.mark.xfail(reason="Streaming tests need running agent + dual WS") async def test_agent_stream_with_multiple_tools(agent_token: str, test_project: dict): """Test agent streaming with multiple tool calls""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию auth_response = await websocket.recv() auth_data = json.loads(auth_response) agent_id = auth_data["data"]["member_id"] agent_slug = auth_data["data"]["slug"] stream_id = str(uuid.uuid4()) # Симулируем последовательность вызовов инструментов tools_sequence = [ { "name": "web_search", "args": {"query": "python best practices"}, "result": {"results": ["doc1", "doc2"]} }, { "name": "read_file", "args": {"filename": "requirements.txt"}, "result": {"content": "pytest==7.0.0\nhttpx==0.24.0"} }, { "name": "execute_command", "args": {"command": "pytest --version"}, "result": {"output": "pytest 7.0.0", "exit_code": 0} } ] # Создаём события для каждого инструмента for i, tool in enumerate(tools_sequence): tool_event = { "type": "agent.stream.tool", "data": { "stream_id": stream_id, "tool_name": tool["name"], "tool_args": tool["args"], "tool_result": tool["result"], "agent_id": agent_id, "agent_slug": agent_slug } } assert_stream_tool_structure(tool_event) # Финальное событие с полным tool_log end_event = { "type": "agent.stream.end", "data": { "stream_id": stream_id, "final_message": "I've completed the multi-step analysis.", "tool_log": [ { "name": tool["name"], "args": tool["args"], "result": tool["result"], "error": None } for tool in tools_sequence ], "agent_id": agent_id, "agent_slug": agent_slug } } assert_stream_end_structure(end_event) assert len(end_event["data"]["tool_log"]) == 3 except Exception as e: pytest.fail(f"Multiple tools stream test failed: {e}") @pytest.mark.asyncio @pytest.mark.xfail(reason="Streaming tests need running agent + dual WS") async def test_agent_stream_with_tool_error(agent_token: str, test_project: dict): """Test agent streaming when tool call fails""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию auth_response = await websocket.recv() auth_data = json.loads(auth_response) agent_id = auth_data["data"]["member_id"] agent_slug = auth_data["data"]["slug"] stream_id = str(uuid.uuid4()) # Инструмент с ошибкой tool_event = { "type": "agent.stream.tool", "data": { "stream_id": stream_id, "tool_name": "execute_command", "tool_args": {"command": "nonexistent_command"}, "tool_result": None, # Нет результата при ошибке "agent_id": agent_id, "agent_slug": agent_slug } } # Финальное событие с ошибкой в tool_log end_event = { "type": "agent.stream.end", "data": { "stream_id": stream_id, "final_message": "I encountered an error while executing the command.", "tool_log": [ { "name": "execute_command", "args": {"command": "nonexistent_command"}, "result": None, "error": "Command not found: nonexistent_command" } ], "agent_id": agent_id, "agent_slug": agent_slug } } assert_stream_tool_structure(tool_event) assert_stream_end_structure(end_event) # Проверяем что ошибка корректно записана error_log = end_event["data"]["tool_log"][0] assert error_log["error"] is not None assert error_log["result"] is None except Exception as e: pytest.fail(f"Tool error stream test failed: {e}") @pytest.mark.asyncio @pytest.mark.xfail(reason="Streaming tests need running agent + dual WS") async def test_agent_stream_incremental_delta(agent_token: str, test_project: dict): """Test agent streaming with incremental text deltas""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию auth_response = await websocket.recv() auth_data = json.loads(auth_response) agent_id = auth_data["data"]["member_id"] agent_slug = auth_data["data"]["slug"] stream_id = str(uuid.uuid4()) # Симулируем инкрементальное написание текста text_deltas = [ "I need to ", "analyze this ", "problem carefully. ", "Let me start by ", "examining the requirements..." ] full_text = "" for delta in text_deltas: full_text += delta delta_event = { "type": "agent.stream.delta", "data": { "stream_id": stream_id, "delta": delta, "agent_id": agent_id, "agent_slug": agent_slug } } assert_stream_delta_structure(delta_event) assert delta_event["data"]["delta"] == delta # Финальное сообщение должно содержать полный текст end_event = { "type": "agent.stream.end", "data": { "stream_id": stream_id, "final_message": full_text, "tool_log": [], "agent_id": agent_id, "agent_slug": agent_slug } } assert_stream_end_structure(end_event) assert end_event["data"]["final_message"] == full_text except Exception as e: pytest.fail(f"Incremental delta stream test failed: {e}") @pytest.mark.asyncio @pytest.mark.xfail(reason="Streaming tests need running agent + dual WS") async def test_agent_stream_ids_consistency(agent_token: str, test_project: dict): """Test that stream_id is consistent across all events in one stream""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию auth_response = await websocket.recv() auth_data = json.loads(auth_response) agent_id = auth_data["data"]["member_id"] agent_slug = auth_data["data"]["slug"] stream_id = str(uuid.uuid4()) # Все события должны иметь одинаковый stream_id events = [ { "type": "agent.stream.start", "data": { "stream_id": stream_id, "project_id": test_project["id"], "chat_id": test_project["chat_id"], "task_id": None, "agent_id": agent_id, "agent_slug": agent_slug } }, { "type": "agent.stream.delta", "data": { "stream_id": stream_id, "delta": "Processing...", "agent_id": agent_id, "agent_slug": agent_slug } }, { "type": "agent.stream.tool", "data": { "stream_id": stream_id, "tool_name": "test_tool", "tool_args": {}, "tool_result": {"status": "ok"}, "agent_id": agent_id, "agent_slug": agent_slug } }, { "type": "agent.stream.end", "data": { "stream_id": stream_id, "final_message": "Complete", "tool_log": [], "agent_id": agent_id, "agent_slug": agent_slug } } ] # Проверяем что все события имеют одинаковый stream_id for event in events: assert event["data"]["stream_id"] == stream_id assert event["data"]["agent_id"] == agent_id assert event["data"]["agent_slug"] == agent_slug except Exception as e: pytest.fail(f"Stream ID consistency test failed: {e}") @pytest.mark.asyncio async def test_agent_config_updated_event(agent_token: str): """Test config.updated event structure""" uri = f"ws://localhost:8101/ws?token={agent_token}" try: async with websockets.connect(uri, timeout=10) as websocket: # Ждём аутентификацию auth_response = await websocket.recv() auth_data = json.loads(auth_response) # Симулируем событие обновления конфигурации config_event = { "type": "config.updated", "data": { "model": "gpt-4", "provider": "openai", "prompt": "Updated system prompt", "chat_listen": "mentions", "task_listen": "assigned", "max_concurrent_tasks": 3, "capabilities": ["coding", "testing", "documentation"], "labels": ["backend", "python", "api", "database"] } } # Проверяем структуру события assert config_event["type"] == "config.updated" assert "data" in config_event config_data = config_event["data"] assert "chat_listen" in config_data assert "task_listen" in config_data assert "capabilities" in config_data assert "labels" in config_data assert "max_concurrent_tasks" in config_data assert config_data["chat_listen"] in ["all", "mentions", "none"] assert config_data["task_listen"] in ["all", "mentions", "assigned", "none"] assert isinstance(config_data["capabilities"], list) assert isinstance(config_data["labels"], list) assert isinstance(config_data["max_concurrent_tasks"], int) assert config_data["max_concurrent_tasks"] > 0 except Exception as e: pytest.fail(f"Config updated event test failed: {e}")