docs/tests/test_streaming.py

544 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Тесты агентного стриминга - WebSocket события stream.start/delta/tool/end
"""
import pytest
import asyncio
import websockets
import json
import uuid
from conftest import assert_uuid
@pytest.mark.asyncio
@pytest.mark.xfail(reason="Streaming tests need running agent + dual WS")
async def test_agent_stream_events_structure(agent_token: str, test_project: dict):
"""Test structure of agent streaming events"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
assert auth_data["type"] == "auth.ok"
agent_id = auth_data["data"]["member_id"]
agent_slug = auth_data["data"]["slug"]
# Симулируем события стриминга (обычно отправляются сервером, не клиентом)
# Но проверяем структуру событий которые должны прийти
stream_id = str(uuid.uuid4())
# 1. agent.stream.start
start_event = {
"type": "agent.stream.start",
"data": {
"stream_id": stream_id,
"project_id": test_project["id"],
"chat_id": test_project["chat_id"],
"task_id": None,
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
# 2. agent.stream.delta
delta_event = {
"type": "agent.stream.delta",
"data": {
"stream_id": stream_id,
"delta": "Hello, I'm thinking about this task...",
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
# 3. agent.stream.tool
tool_event = {
"type": "agent.stream.tool",
"data": {
"stream_id": stream_id,
"tool_name": "web_search",
"tool_args": {"query": "python testing best practices"},
"tool_result": {"results": ["result1", "result2"]},
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
# 4. agent.stream.end
end_event = {
"type": "agent.stream.end",
"data": {
"stream_id": stream_id,
"final_message": "I've completed the analysis and here are my findings...",
"tool_log": [
{
"name": "web_search",
"args": {"query": "python testing best practices"},
"result": {"results": ["result1", "result2"]},
"error": None
}
],
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
# Проверяем структуру событий
assert_stream_start_structure(start_event)
assert_stream_delta_structure(delta_event)
assert_stream_tool_structure(tool_event)
assert_stream_end_structure(end_event)
except Exception as e:
pytest.fail(f"Stream events structure test failed: {e}")
def assert_stream_start_structure(event):
"""Validate agent.stream.start event structure"""
assert event["type"] == "agent.stream.start"
assert "data" in event
data = event["data"]
assert "stream_id" in data
assert "project_id" in data
assert "chat_id" in data
assert "task_id" in data # может быть None
assert "agent_id" in data
assert "agent_slug" in data
assert_uuid(data["stream_id"])
assert_uuid(data["project_id"])
assert_uuid(data["agent_id"])
assert isinstance(data["agent_slug"], str)
if data["chat_id"]:
assert_uuid(data["chat_id"])
if data["task_id"]:
assert_uuid(data["task_id"])
def assert_stream_delta_structure(event):
"""Validate agent.stream.delta event structure"""
assert event["type"] == "agent.stream.delta"
assert "data" in event
data = event["data"]
assert "stream_id" in data
assert "delta" in data
assert "agent_id" in data
assert "agent_slug" in data
assert_uuid(data["stream_id"])
assert_uuid(data["agent_id"])
assert isinstance(data["delta"], str)
assert isinstance(data["agent_slug"], str)
def assert_stream_tool_structure(event):
"""Validate agent.stream.tool event structure"""
assert event["type"] == "agent.stream.tool"
assert "data" in event
data = event["data"]
assert "stream_id" in data
assert "tool_name" in data
assert "tool_args" in data
assert "tool_result" in data
assert "agent_id" in data
assert "agent_slug" in data
assert_uuid(data["stream_id"])
assert_uuid(data["agent_id"])
assert isinstance(data["tool_name"], str)
assert isinstance(data["agent_slug"], str)
# tool_args и tool_result могут быть любыми JSON объектами
assert data["tool_args"] is not None
assert data["tool_result"] is not None
def assert_stream_end_structure(event):
"""Validate agent.stream.end event structure"""
assert event["type"] == "agent.stream.end"
assert "data" in event
data = event["data"]
assert "stream_id" in data
assert "final_message" in data
assert "tool_log" in data
assert "agent_id" in data
assert "agent_slug" in data
assert_uuid(data["stream_id"])
assert_uuid(data["agent_id"])
assert isinstance(data["final_message"], str)
assert isinstance(data["tool_log"], list)
assert isinstance(data["agent_slug"], str)
# Проверяем структуру tool_log
for tool_entry in data["tool_log"]:
assert "name" in tool_entry
assert "args" in tool_entry
assert "result" in tool_entry
# "error" может отсутствовать или быть None
@pytest.mark.asyncio
@pytest.mark.xfail(reason="Streaming tests need running agent + dual WS")
async def test_agent_stream_task_context(agent_token: str, test_task: dict):
"""Test agent streaming in context of specific task"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
assert auth_data["type"] == "auth.ok"
agent_id = auth_data["data"]["member_id"]
agent_slug = auth_data["data"]["slug"]
stream_id = str(uuid.uuid4())
# Стриминг в контексте задачи
start_event = {
"type": "agent.stream.start",
"data": {
"stream_id": stream_id,
"project_id": test_task["project"]["id"],
"chat_id": None, # Комментарий к задаче, не чат
"task_id": test_task["id"],
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
assert_stream_start_structure(start_event)
# Проверяем что task_id корректно указан
assert start_event["data"]["task_id"] == test_task["id"]
assert start_event["data"]["chat_id"] is None
except Exception as e:
pytest.fail(f"Stream task context test failed: {e}")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="Streaming tests need running agent + dual WS")
async def test_agent_stream_with_multiple_tools(agent_token: str, test_project: dict):
"""Test agent streaming with multiple tool calls"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
agent_id = auth_data["data"]["member_id"]
agent_slug = auth_data["data"]["slug"]
stream_id = str(uuid.uuid4())
# Симулируем последовательность вызовов инструментов
tools_sequence = [
{
"name": "web_search",
"args": {"query": "python best practices"},
"result": {"results": ["doc1", "doc2"]}
},
{
"name": "read_file",
"args": {"filename": "requirements.txt"},
"result": {"content": "pytest==7.0.0\nhttpx==0.24.0"}
},
{
"name": "execute_command",
"args": {"command": "pytest --version"},
"result": {"output": "pytest 7.0.0", "exit_code": 0}
}
]
# Создаём события для каждого инструмента
for i, tool in enumerate(tools_sequence):
tool_event = {
"type": "agent.stream.tool",
"data": {
"stream_id": stream_id,
"tool_name": tool["name"],
"tool_args": tool["args"],
"tool_result": tool["result"],
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
assert_stream_tool_structure(tool_event)
# Финальное событие с полным tool_log
end_event = {
"type": "agent.stream.end",
"data": {
"stream_id": stream_id,
"final_message": "I've completed the multi-step analysis.",
"tool_log": [
{
"name": tool["name"],
"args": tool["args"],
"result": tool["result"],
"error": None
} for tool in tools_sequence
],
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
assert_stream_end_structure(end_event)
assert len(end_event["data"]["tool_log"]) == 3
except Exception as e:
pytest.fail(f"Multiple tools stream test failed: {e}")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="Streaming tests need running agent + dual WS")
async def test_agent_stream_with_tool_error(agent_token: str, test_project: dict):
"""Test agent streaming when tool call fails"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
agent_id = auth_data["data"]["member_id"]
agent_slug = auth_data["data"]["slug"]
stream_id = str(uuid.uuid4())
# Инструмент с ошибкой
tool_event = {
"type": "agent.stream.tool",
"data": {
"stream_id": stream_id,
"tool_name": "execute_command",
"tool_args": {"command": "nonexistent_command"},
"tool_result": None, # Нет результата при ошибке
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
# Финальное событие с ошибкой в tool_log
end_event = {
"type": "agent.stream.end",
"data": {
"stream_id": stream_id,
"final_message": "I encountered an error while executing the command.",
"tool_log": [
{
"name": "execute_command",
"args": {"command": "nonexistent_command"},
"result": None,
"error": "Command not found: nonexistent_command"
}
],
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
assert_stream_tool_structure(tool_event)
assert_stream_end_structure(end_event)
# Проверяем что ошибка корректно записана
error_log = end_event["data"]["tool_log"][0]
assert error_log["error"] is not None
assert error_log["result"] is None
except Exception as e:
pytest.fail(f"Tool error stream test failed: {e}")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="Streaming tests need running agent + dual WS")
async def test_agent_stream_incremental_delta(agent_token: str, test_project: dict):
"""Test agent streaming with incremental text deltas"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
agent_id = auth_data["data"]["member_id"]
agent_slug = auth_data["data"]["slug"]
stream_id = str(uuid.uuid4())
# Симулируем инкрементальное написание текста
text_deltas = [
"I need to ",
"analyze this ",
"problem carefully. ",
"Let me start by ",
"examining the requirements..."
]
full_text = ""
for delta in text_deltas:
full_text += delta
delta_event = {
"type": "agent.stream.delta",
"data": {
"stream_id": stream_id,
"delta": delta,
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
assert_stream_delta_structure(delta_event)
assert delta_event["data"]["delta"] == delta
# Финальное сообщение должно содержать полный текст
end_event = {
"type": "agent.stream.end",
"data": {
"stream_id": stream_id,
"final_message": full_text,
"tool_log": [],
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
assert_stream_end_structure(end_event)
assert end_event["data"]["final_message"] == full_text
except Exception as e:
pytest.fail(f"Incremental delta stream test failed: {e}")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="Streaming tests need running agent + dual WS")
async def test_agent_stream_ids_consistency(agent_token: str, test_project: dict):
"""Test that stream_id is consistent across all events in one stream"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
agent_id = auth_data["data"]["member_id"]
agent_slug = auth_data["data"]["slug"]
stream_id = str(uuid.uuid4())
# Все события должны иметь одинаковый stream_id
events = [
{
"type": "agent.stream.start",
"data": {
"stream_id": stream_id,
"project_id": test_project["id"],
"chat_id": test_project["chat_id"],
"task_id": None,
"agent_id": agent_id,
"agent_slug": agent_slug
}
},
{
"type": "agent.stream.delta",
"data": {
"stream_id": stream_id,
"delta": "Processing...",
"agent_id": agent_id,
"agent_slug": agent_slug
}
},
{
"type": "agent.stream.tool",
"data": {
"stream_id": stream_id,
"tool_name": "test_tool",
"tool_args": {},
"tool_result": {"status": "ok"},
"agent_id": agent_id,
"agent_slug": agent_slug
}
},
{
"type": "agent.stream.end",
"data": {
"stream_id": stream_id,
"final_message": "Complete",
"tool_log": [],
"agent_id": agent_id,
"agent_slug": agent_slug
}
}
]
# Проверяем что все события имеют одинаковый stream_id
for event in events:
assert event["data"]["stream_id"] == stream_id
assert event["data"]["agent_id"] == agent_id
assert event["data"]["agent_slug"] == agent_slug
except Exception as e:
pytest.fail(f"Stream ID consistency test failed: {e}")
@pytest.mark.asyncio
async def test_agent_config_updated_event(agent_token: str):
"""Test config.updated event structure"""
uri = f"ws://localhost:8101/ws?token={agent_token}"
try:
async with websockets.connect(uri, timeout=10) as websocket:
# Ждём аутентификацию
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
# Симулируем событие обновления конфигурации
config_event = {
"type": "config.updated",
"data": {
"model": "gpt-4",
"provider": "openai",
"prompt": "Updated system prompt",
"chat_listen": "mentions",
"task_listen": "assigned",
"max_concurrent_tasks": 3,
"capabilities": ["coding", "testing", "documentation"],
"labels": ["backend", "python", "api", "database"]
}
}
# Проверяем структуру события
assert config_event["type"] == "config.updated"
assert "data" in config_event
config_data = config_event["data"]
assert "chat_listen" in config_data
assert "task_listen" in config_data
assert "capabilities" in config_data
assert "labels" in config_data
assert "max_concurrent_tasks" in config_data
assert config_data["chat_listen"] in ["all", "mentions", "none"]
assert config_data["task_listen"] in ["all", "mentions", "assigned", "none"]
assert isinstance(config_data["capabilities"], list)
assert isinstance(config_data["labels"], list)
assert isinstance(config_data["max_concurrent_tasks"], int)
assert config_data["max_concurrent_tasks"] > 0
except Exception as e:
pytest.fail(f"Config updated event test failed: {e}")