229 lines
6.8 KiB
Python
229 lines
6.8 KiB
Python
"""
|
||
Упрощенный тест LangGraph без реального review
|
||
Проверяет только механизм стриминга событий
|
||
"""
|
||
|
||
import asyncio
|
||
from langgraph.graph import StateGraph, END
|
||
from typing import TypedDict, Annotated, Any
|
||
import operator
|
||
|
||
|
||
# Простое состояние для теста
|
||
class SimpleState(TypedDict):
|
||
counter: Annotated[int, operator.add]
|
||
messages: list[str]
|
||
step: str
|
||
|
||
|
||
# Простые ноды
|
||
async def node_1(state: SimpleState) -> SimpleState:
|
||
"""Первая нода"""
|
||
print(" [NODE 1] Выполняется...")
|
||
await asyncio.sleep(0.5)
|
||
return {
|
||
"counter": 1,
|
||
"messages": ["Node 1 completed"],
|
||
"step": "node_1"
|
||
}
|
||
|
||
|
||
async def node_2(state: SimpleState) -> SimpleState:
|
||
"""Вторая нода"""
|
||
print(" [NODE 2] Выполняется...")
|
||
await asyncio.sleep(0.5)
|
||
return {
|
||
"counter": 1,
|
||
"messages": ["Node 2 completed"],
|
||
"step": "node_2"
|
||
}
|
||
|
||
|
||
async def node_3(state: SimpleState) -> SimpleState:
|
||
"""Третья нода"""
|
||
print(" [NODE 3] Выполняется...")
|
||
await asyncio.sleep(0.5)
|
||
return {
|
||
"counter": 1,
|
||
"messages": ["Node 3 completed"],
|
||
"step": "node_3"
|
||
}
|
||
|
||
|
||
def create_test_graph():
|
||
"""Создает тестовый граф"""
|
||
workflow = StateGraph(SimpleState)
|
||
|
||
# Добавляем ноды
|
||
workflow.add_node("node_1", node_1)
|
||
workflow.add_node("node_2", node_2)
|
||
workflow.add_node("node_3", node_3)
|
||
|
||
# Определяем связи
|
||
workflow.set_entry_point("node_1")
|
||
workflow.add_edge("node_1", "node_2")
|
||
workflow.add_edge("node_2", "node_3")
|
||
workflow.add_edge("node_3", END)
|
||
|
||
return workflow.compile()
|
||
|
||
|
||
async def test_stream_modes():
|
||
"""Тестирует разные режимы стриминга"""
|
||
graph = create_test_graph()
|
||
|
||
initial_state: SimpleState = {
|
||
"counter": 0,
|
||
"messages": [],
|
||
"step": "start"
|
||
}
|
||
|
||
# Тест 1: updates
|
||
print("\n" + "="*80)
|
||
print("ТЕСТ 1: stream_mode=['updates']")
|
||
print("="*80)
|
||
|
||
event_count = 0
|
||
async for event in graph.astream(initial_state, stream_mode=["updates"]):
|
||
event_count += 1
|
||
print(f"\n📨 Event #{event_count}")
|
||
print(f" Type: {type(event)}")
|
||
print(f" Content: {event}")
|
||
|
||
print(f"\n✅ Получено событий: {event_count}")
|
||
|
||
# Тест 2: messages
|
||
print("\n" + "="*80)
|
||
print("ТЕСТ 2: stream_mode=['messages']")
|
||
print("="*80)
|
||
|
||
event_count = 0
|
||
async for event in graph.astream(initial_state, stream_mode=["messages"]):
|
||
event_count += 1
|
||
print(f"\n📨 Event #{event_count}")
|
||
print(f" Type: {type(event)}")
|
||
print(f" Content: {event}")
|
||
|
||
print(f"\n✅ Получено событий: {event_count}")
|
||
|
||
# Тест 3: updates + messages
|
||
print("\n" + "="*80)
|
||
print("ТЕСТ 3: stream_mode=['updates', 'messages']")
|
||
print("="*80)
|
||
|
||
event_count = 0
|
||
async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]):
|
||
event_count += 1
|
||
print(f"\n📨 Event #{event_count}")
|
||
print(f" Type: {type(event)}")
|
||
|
||
if isinstance(event, dict):
|
||
print(f" Keys: {list(event.keys())}")
|
||
elif isinstance(event, tuple):
|
||
print(f" Tuple[0] type: {type(event[0])}")
|
||
print(f" Tuple[1] type: {type(event[1])}")
|
||
|
||
print(f" Content: {event}")
|
||
|
||
print(f"\n✅ Получено событий: {event_count}")
|
||
|
||
# Тест 4: values
|
||
print("\n" + "="*80)
|
||
print("ТЕСТ 4: stream_mode=['values']")
|
||
print("="*80)
|
||
|
||
event_count = 0
|
||
async for event in graph.astream(initial_state, stream_mode=["values"]):
|
||
event_count += 1
|
||
print(f"\n📨 Event #{event_count}")
|
||
print(f" Type: {type(event)}")
|
||
print(f" Content: {event}")
|
||
|
||
print(f"\n✅ Получено событий: {event_count}")
|
||
|
||
# Тест 5: debug (все режимы)
|
||
print("\n" + "="*80)
|
||
print("ТЕСТ 5: stream_mode=['updates', 'messages', 'values', 'debug']")
|
||
print("="*80)
|
||
|
||
event_count = 0
|
||
async for event in graph.astream(
|
||
initial_state,
|
||
stream_mode=["updates", "messages", "values", "debug"]
|
||
):
|
||
event_count += 1
|
||
print(f"\n📨 Event #{event_count}")
|
||
print(f" Type: {type(event)}")
|
||
|
||
if isinstance(event, tuple) and len(event) >= 2:
|
||
print(f" Event type (tuple[0]): {event[0]}")
|
||
print(f" Event data (tuple[1]): {event[1]}")
|
||
else:
|
||
print(f" Content: {event}")
|
||
|
||
print(f"\n✅ Получено событий: {event_count}")
|
||
|
||
|
||
async def test_with_callback():
|
||
"""Тест с использованием callback для обработки событий"""
|
||
print("\n" + "="*80)
|
||
print("ТЕСТ 6: Callback обработка событий")
|
||
print("="*80)
|
||
|
||
graph = create_test_graph()
|
||
|
||
initial_state: SimpleState = {
|
||
"counter": 0,
|
||
"messages": [],
|
||
"step": "start"
|
||
}
|
||
|
||
collected_events = []
|
||
|
||
async def event_callback(event_type: str, event_data: Any):
|
||
"""Callback для обработки событий"""
|
||
collected_events.append({
|
||
"type": event_type,
|
||
"data": event_data
|
||
})
|
||
print(f" 🔔 Callback: {event_type}")
|
||
|
||
# Симуляция обработки событий с callback
|
||
event_count = 0
|
||
async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]):
|
||
event_count += 1
|
||
print(f"\n📨 Event #{event_count}: {type(event)}")
|
||
|
||
# Обрабатываем событие
|
||
if isinstance(event, tuple) and len(event) >= 2:
|
||
await event_callback(str(event[0]), event[1])
|
||
elif isinstance(event, dict):
|
||
for node_name, node_data in event.items():
|
||
await event_callback(f"node_update_{node_name}", node_data)
|
||
else:
|
||
await event_callback("unknown", event)
|
||
|
||
print(f"\n✅ Всего событий: {event_count}")
|
||
print(f"✅ Callback вызовов: {len(collected_events)}")
|
||
print(f"\n📋 Собранные события:")
|
||
for i, evt in enumerate(collected_events, 1):
|
||
print(f" {i}. {evt['type']}")
|
||
|
||
|
||
async def main():
|
||
print("\n" + "="*80)
|
||
print("ПРОСТОЙ ТЕСТ LANGGRAPH STREAMING")
|
||
print("="*80 + "\n")
|
||
|
||
await test_stream_modes()
|
||
await test_with_callback()
|
||
|
||
print("\n" + "="*80)
|
||
print("ТЕСТИРОВАНИЕ ЗАВЕРШЕНО")
|
||
print("="*80 + "\n")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|