""" Упрощенный тест LangGraph без реального review Проверяет только механизм стриминга событий """ import asyncio from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated, Any import operator # Простое состояние для теста class SimpleState(TypedDict): counter: Annotated[int, operator.add] messages: list[str] step: str # Простые ноды async def node_1(state: SimpleState) -> SimpleState: """Первая нода""" print(" [NODE 1] Выполняется...") await asyncio.sleep(0.5) return { "counter": 1, "messages": ["Node 1 completed"], "step": "node_1" } async def node_2(state: SimpleState) -> SimpleState: """Вторая нода""" print(" [NODE 2] Выполняется...") await asyncio.sleep(0.5) return { "counter": 1, "messages": ["Node 2 completed"], "step": "node_2" } async def node_3(state: SimpleState) -> SimpleState: """Третья нода""" print(" [NODE 3] Выполняется...") await asyncio.sleep(0.5) return { "counter": 1, "messages": ["Node 3 completed"], "step": "node_3" } def create_test_graph(): """Создает тестовый граф""" workflow = StateGraph(SimpleState) # Добавляем ноды workflow.add_node("node_1", node_1) workflow.add_node("node_2", node_2) workflow.add_node("node_3", node_3) # Определяем связи workflow.set_entry_point("node_1") workflow.add_edge("node_1", "node_2") workflow.add_edge("node_2", "node_3") workflow.add_edge("node_3", END) return workflow.compile() async def test_stream_modes(): """Тестирует разные режимы стриминга""" graph = create_test_graph() initial_state: SimpleState = { "counter": 0, "messages": [], "step": "start" } # Тест 1: updates print("\n" + "="*80) print("ТЕСТ 1: stream_mode=['updates']") print("="*80) event_count = 0 async for event in graph.astream(initial_state, stream_mode=["updates"]): event_count += 1 print(f"\n📨 Event #{event_count}") print(f" Type: {type(event)}") print(f" Content: {event}") print(f"\n✅ Получено событий: {event_count}") # Тест 2: messages print("\n" + "="*80) print("ТЕСТ 2: stream_mode=['messages']") print("="*80) event_count = 0 async for event in graph.astream(initial_state, stream_mode=["messages"]): event_count += 1 print(f"\n📨 Event #{event_count}") print(f" Type: {type(event)}") print(f" Content: {event}") print(f"\n✅ Получено событий: {event_count}") # Тест 3: updates + messages print("\n" + "="*80) print("ТЕСТ 3: stream_mode=['updates', 'messages']") print("="*80) event_count = 0 async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]): event_count += 1 print(f"\n📨 Event #{event_count}") print(f" Type: {type(event)}") if isinstance(event, dict): print(f" Keys: {list(event.keys())}") elif isinstance(event, tuple): print(f" Tuple[0] type: {type(event[0])}") print(f" Tuple[1] type: {type(event[1])}") print(f" Content: {event}") print(f"\n✅ Получено событий: {event_count}") # Тест 4: values print("\n" + "="*80) print("ТЕСТ 4: stream_mode=['values']") print("="*80) event_count = 0 async for event in graph.astream(initial_state, stream_mode=["values"]): event_count += 1 print(f"\n📨 Event #{event_count}") print(f" Type: {type(event)}") print(f" Content: {event}") print(f"\n✅ Получено событий: {event_count}") # Тест 5: debug (все режимы) print("\n" + "="*80) print("ТЕСТ 5: stream_mode=['updates', 'messages', 'values', 'debug']") print("="*80) event_count = 0 async for event in graph.astream( initial_state, stream_mode=["updates", "messages", "values", "debug"] ): event_count += 1 print(f"\n📨 Event #{event_count}") print(f" Type: {type(event)}") if isinstance(event, tuple) and len(event) >= 2: print(f" Event type (tuple[0]): {event[0]}") print(f" Event data (tuple[1]): {event[1]}") else: print(f" Content: {event}") print(f"\n✅ Получено событий: {event_count}") async def test_with_callback(): """Тест с использованием callback для обработки событий""" print("\n" + "="*80) print("ТЕСТ 6: Callback обработка событий") print("="*80) graph = create_test_graph() initial_state: SimpleState = { "counter": 0, "messages": [], "step": "start" } collected_events = [] async def event_callback(event_type: str, event_data: Any): """Callback для обработки событий""" collected_events.append({ "type": event_type, "data": event_data }) print(f" 🔔 Callback: {event_type}") # Симуляция обработки событий с callback event_count = 0 async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]): event_count += 1 print(f"\n📨 Event #{event_count}: {type(event)}") # Обрабатываем событие if isinstance(event, tuple) and len(event) >= 2: await event_callback(str(event[0]), event[1]) elif isinstance(event, dict): for node_name, node_data in event.items(): await event_callback(f"node_update_{node_name}", node_data) else: await event_callback("unknown", event) print(f"\n✅ Всего событий: {event_count}") print(f"✅ Callback вызовов: {len(collected_events)}") print(f"\n📋 Собранные события:") for i, evt in enumerate(collected_events, 1): print(f" {i}. {evt['type']}") async def main(): print("\n" + "="*80) print("ПРОСТОЙ ТЕСТ LANGGRAPH STREAMING") print("="*80 + "\n") await test_stream_modes() await test_with_callback() print("\n" + "="*80) print("ТЕСТИРОВАНИЕ ЗАВЕРШЕНО") print("="*80 + "\n") if __name__ == "__main__": asyncio.run(main())