code-review-agent/tests/test_simple_graph.py

229 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Упрощенный тест LangGraph без реального review
Проверяет только механизм стриминга событий
"""
import asyncio
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Any
import operator
# Простое состояние для теста
class SimpleState(TypedDict):
counter: Annotated[int, operator.add]
messages: list[str]
step: str
# Простые ноды
async def node_1(state: SimpleState) -> SimpleState:
"""Первая нода"""
print(" [NODE 1] Выполняется...")
await asyncio.sleep(0.5)
return {
"counter": 1,
"messages": ["Node 1 completed"],
"step": "node_1"
}
async def node_2(state: SimpleState) -> SimpleState:
"""Вторая нода"""
print(" [NODE 2] Выполняется...")
await asyncio.sleep(0.5)
return {
"counter": 1,
"messages": ["Node 2 completed"],
"step": "node_2"
}
async def node_3(state: SimpleState) -> SimpleState:
"""Третья нода"""
print(" [NODE 3] Выполняется...")
await asyncio.sleep(0.5)
return {
"counter": 1,
"messages": ["Node 3 completed"],
"step": "node_3"
}
def create_test_graph():
"""Создает тестовый граф"""
workflow = StateGraph(SimpleState)
# Добавляем ноды
workflow.add_node("node_1", node_1)
workflow.add_node("node_2", node_2)
workflow.add_node("node_3", node_3)
# Определяем связи
workflow.set_entry_point("node_1")
workflow.add_edge("node_1", "node_2")
workflow.add_edge("node_2", "node_3")
workflow.add_edge("node_3", END)
return workflow.compile()
async def test_stream_modes():
"""Тестирует разные режимы стриминга"""
graph = create_test_graph()
initial_state: SimpleState = {
"counter": 0,
"messages": [],
"step": "start"
}
# Тест 1: updates
print("\n" + "="*80)
print("ТЕСТ 1: stream_mode=['updates']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["updates"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 2: messages
print("\n" + "="*80)
print("ТЕСТ 2: stream_mode=['messages']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 3: updates + messages
print("\n" + "="*80)
print("ТЕСТ 3: stream_mode=['updates', 'messages']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
if isinstance(event, dict):
print(f" Keys: {list(event.keys())}")
elif isinstance(event, tuple):
print(f" Tuple[0] type: {type(event[0])}")
print(f" Tuple[1] type: {type(event[1])}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 4: values
print("\n" + "="*80)
print("ТЕСТ 4: stream_mode=['values']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["values"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 5: debug (все режимы)
print("\n" + "="*80)
print("ТЕСТ 5: stream_mode=['updates', 'messages', 'values', 'debug']")
print("="*80)
event_count = 0
async for event in graph.astream(
initial_state,
stream_mode=["updates", "messages", "values", "debug"]
):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
if isinstance(event, tuple) and len(event) >= 2:
print(f" Event type (tuple[0]): {event[0]}")
print(f" Event data (tuple[1]): {event[1]}")
else:
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
async def test_with_callback():
"""Тест с использованием callback для обработки событий"""
print("\n" + "="*80)
print("ТЕСТ 6: Callback обработка событий")
print("="*80)
graph = create_test_graph()
initial_state: SimpleState = {
"counter": 0,
"messages": [],
"step": "start"
}
collected_events = []
async def event_callback(event_type: str, event_data: Any):
"""Callback для обработки событий"""
collected_events.append({
"type": event_type,
"data": event_data
})
print(f" 🔔 Callback: {event_type}")
# Симуляция обработки событий с callback
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}: {type(event)}")
# Обрабатываем событие
if isinstance(event, tuple) and len(event) >= 2:
await event_callback(str(event[0]), event[1])
elif isinstance(event, dict):
for node_name, node_data in event.items():
await event_callback(f"node_update_{node_name}", node_data)
else:
await event_callback("unknown", event)
print(f"\nВсего событий: {event_count}")
print(f"✅ Callback вызовов: {len(collected_events)}")
print(f"\n📋 Собранные события:")
for i, evt in enumerate(collected_events, 1):
print(f" {i}. {evt['type']}")
async def main():
print("\n" + "="*80)
print("ПРОСТОЙ ТЕСТ LANGGRAPH STREAMING")
print("="*80 + "\n")
await test_stream_modes()
await test_with_callback()
print("\n" + "="*80)
print("ТЕСТИРОВАНИЕ ЗАВЕРШЕНО")
print("="*80 + "\n")
if __name__ == "__main__":
asyncio.run(main())