From cfba28f913aeb3541cf19876b61f24b4d80b2018 Mon Sep 17 00:00:00 2001 From: Primakov Alexandr Alexandrovich Date: Mon, 13 Oct 2025 13:46:35 +0300 Subject: [PATCH] fix: Correct LangGraph event handling - events are tuples not dicts + add test scripts --- TEST_STREAMING.md | 181 ++++++++++++++++++++++++++ backend/app/agents/reviewer.py | 56 +++++--- test_langgraph_events.py | 229 +++++++++++++++++++++++++++++++++ test_simple_graph.py | 228 ++++++++++++++++++++++++++++++++ 4 files changed, 674 insertions(+), 20 deletions(-) create mode 100644 TEST_STREAMING.md create mode 100644 test_langgraph_events.py create mode 100644 test_simple_graph.py diff --git a/TEST_STREAMING.md b/TEST_STREAMING.md new file mode 100644 index 0000000..692e31e --- /dev/null +++ b/TEST_STREAMING.md @@ -0,0 +1,181 @@ +# Тестирование LangGraph Streaming + +Эти скрипты помогут проверить, как работает стриминг событий из LangGraph. + +## Скрипты + +### 1. `test_simple_graph.py` - Простой тест (БЕЗ БД) + +**Рекомендуется запустить ПЕРВЫМ** для понимания как работает стриминг. + +Тестирует простой граф с 3 нодами без реального review. + +```bash +# Активировать venv +cd backend +source venv/Scripts/activate # Windows Git Bash +# или +source venv/bin/activate # Linux/Mac + +# Запустить +python ../test_simple_graph.py +``` + +**Что тестирует:** +- ✅ `stream_mode=['updates']` - обновления нод +- ✅ `stream_mode=['messages']` - сообщения (LLM calls) +- ✅ `stream_mode=['updates', 'messages']` - оба режима +- ✅ `stream_mode=['values']` - значения состояния +- ✅ `stream_mode=['debug']` - режим отладки +- ✅ Callback обработка событий + +**Ожидаемый результат:** +Должны появиться события для каждой ноды (node_1, node_2, node_3). + +--- + +### 2. `test_langgraph_events.py` - Полный тест (С БД) + +Тестирует реальный ReviewerAgent с настоящими данными из БД. + +⚠️ **Требует:** +- Работающую БД с данными +- Существующий Review ID, PR Number, Repository ID +- Настроенный `.env` файл + +```bash +# Активировать venv +cd backend +source venv/Scripts/activate # Windows +# или +source venv/bin/activate # Linux/Mac + +# Запустить +python ../test_langgraph_events.py +``` + +**Перед запуском:** +Отредактируйте в файле `test_langgraph_events.py`: + +```python +TEST_REVIEW_ID = 1 # ID существующего review +TEST_PR_NUMBER = 5 # Номер PR +TEST_REPOSITORY_ID = 1 # ID репозитория +``` + +**Что тестирует:** +- ✅ Полный цикл review с callback +- ✅ RAW стриминг напрямую из графа +- ✅ Все режимы: `updates`, `messages`, `updates + messages` + +--- + +## Запуск локально (быстрый старт) + +### Шаг 1: Простой тест +```bash +cd backend +source venv/Scripts/activate +python ../test_simple_graph.py +``` + +Смотрите вывод - должны быть события от каждой ноды. + +### Шаг 2: Проверка формата событий + +Обратите внимание на **тип** и **структуру** событий: + +``` +📨 Event #1 + Type: + Keys: ['node_1'] + Content: {'node_1': {...}} +``` + +или + +``` +📨 Event #1 + Type: + Tuple[0]: 'messages' + Tuple[1]: [AIMessage(...)] +``` + +### Шаг 3: Полный тест (если нужно) + +Отредактируйте параметры в `test_langgraph_events.py` и запустите: + +```bash +python ../test_langgraph_events.py +``` + +--- + +## Что искать в выводе + +### ✅ ХОРОШО: + +``` +📨 Event #1 + Type: + Content: {'node_1': {...}} + +📨 Event #2 + Type: + Content: {'node_2': {...}} +``` + +**События приходят!** Граф работает. + +### ❌ ПЛОХО: + +``` +✅ Получено событий: 0 +``` + +**События НЕ приходят!** Проблема с графом или версией LangGraph. + +--- + +## Отладка + +Если события не приходят: + +1. **Проверьте версию LangGraph:** + ```bash + pip show langgraph + ``` + Должна быть >= 0.1.0 + +2. **Проверьте, что граф компилируется:** + ```python + graph = workflow.compile() + print(graph) # Должен вывести информацию о графе + ``` + +3. **Попробуйте `ainvoke` вместо `astream`:** + ```python + result = await graph.ainvoke(initial_state) + print(result) # Должен вернуть финальное состояние + ``` + +4. **Проверьте логи:** + Включите DEBUG логирование: + ```python + import logging + logging.basicConfig(level=logging.DEBUG) + ``` + +--- + +## Результаты + +После запуска тестов вы узнаете: + +1. ✅ **Работает ли стриминг вообще?** +2. ✅ **Какой формат у событий?** +3. ✅ **Какие режимы стриминга поддерживаются?** +4. ✅ **Как правильно обрабатывать события?** + +Это поможет понять, почему события не доходят до фронтенда. + diff --git a/backend/app/agents/reviewer.py b/backend/app/agents/reviewer.py index 29eeadc..5a29fa6 100644 --- a/backend/app/agents/reviewer.py +++ b/backend/app/agents/reviewer.py @@ -555,28 +555,44 @@ class ReviewerAgent: stream_mode=["updates"] ): event_count += 1 - print(f"📨 Event #{event_count} received from graph: {type(event)}") - print(f" Event content: {event if not isinstance(event, dict) or len(str(event)) < 200 else 'dict with keys: ' + str(list(event.keys()))}") + print(f"📨 Event #{event_count} received from graph") + print(f" Type: {type(event)}") + + # LangGraph returns events as tuples: (event_type, data) + if isinstance(event, tuple) and len(event) >= 2: + event_type, event_data = event[0], event[1] + print(f" Event type: {event_type}") + print(f" Event data type: {type(event_data)}") + + # Handle 'updates' events (node updates) + if event_type == 'updates' and isinstance(event_data, dict): + for node_name, node_data in event_data.items(): + print(f" 🔔 Node update: {node_name}") + + if on_event: + print(f" 📤 Sending event to callback for node: {node_name}") + await on_event({ + "type": "agent_step", + "step": node_name, + "message": f"Шаг: {node_name}" + }) + + # Store final state + if isinstance(node_data, dict): + final_state = node_data + + # Handle 'values' events (state snapshots) + elif event_type == 'values': + print(f" 📊 State snapshot received") + if isinstance(event_data, dict): + final_state = event_data + + # Handle 'debug' events + elif event_type == 'debug': + print(f" 🐛 Debug event: {event_data.get('type') if isinstance(event_data, dict) else 'unknown'}") - # Handle different event types - if isinstance(event, dict): - # Node updates - for node_name, node_data in event.items(): - print(f" 🔔 Node update: {node_name}") - - if on_event: - print(f" 📤 Sending event to callback for node: {node_name}") - await on_event({ - "type": "agent_step", - "step": node_name, - "data": node_data if not isinstance(node_data, dict) else {"status": "processing"} - }) - - # Store final state - if isinstance(node_data, dict): - final_state = node_data else: - print(f" ⚠️ Unexpected event type: {type(event)}") + print(f" ⚠️ Unexpected event format: {event}") except Exception as e: print(f"❌ Error in graph streaming: {e}") diff --git a/test_langgraph_events.py b/test_langgraph_events.py new file mode 100644 index 0000000..3615a8e --- /dev/null +++ b/test_langgraph_events.py @@ -0,0 +1,229 @@ +""" +Тестовый скрипт для проверки событий LangGraph +Запустить: python test_langgraph_events.py +""" + +import asyncio +from backend.app.database import AsyncSessionLocal +from backend.app.agents.reviewer import ReviewerAgent + + +async def test_streaming(): + """Тест стриминга событий от LangGraph""" + print("="*80) + print("ТЕСТ СТРИМИНГА СОБЫТИЙ LANGGRAPH") + print("="*80) + + # Параметры для теста (замените на реальные значения из вашей БД) + TEST_REVIEW_ID = 1 + TEST_PR_NUMBER = 5 + TEST_REPOSITORY_ID = 1 + + print(f"\n📋 Параметры теста:") + print(f" Review ID: {TEST_REVIEW_ID}") + print(f" PR Number: {TEST_PR_NUMBER}") + print(f" Repository ID: {TEST_REPOSITORY_ID}") + + # Создаем сессию БД + async with AsyncSessionLocal() as db: + print(f"\n✅ Подключение к БД установлено") + + # Создаем агента + agent = ReviewerAgent(db) + print(f"✅ ReviewerAgent создан") + + # Счетчик событий + event_counter = { + 'total': 0, + 'updates': 0, + 'messages': 0, + 'other': 0 + } + + # Callback для событий + async def on_event(event: dict): + event_counter['total'] += 1 + event_type = event.get('type', 'unknown') + + print(f"\n{'='*80}") + print(f"📨 СОБЫТИЕ #{event_counter['total']} - Тип: {event_type}") + print(f"{'='*80}") + print(f"Полное событие: {event}") + print(f"{'='*80}\n") + + print(f"\n🚀 Запуск review с стримингом...\n") + + try: + # Запускаем review + result = await agent.run_review_stream( + review_id=TEST_REVIEW_ID, + pr_number=TEST_PR_NUMBER, + repository_id=TEST_REPOSITORY_ID, + on_event=on_event + ) + + print(f"\n{'='*80}") + print(f"✅ REVIEW ЗАВЕРШЕН") + print(f"{'='*80}") + print(f"\n📊 Статистика событий:") + print(f" Всего событий: {event_counter['total']}") + print(f" Updates: {event_counter['updates']}") + print(f" Messages: {event_counter['messages']}") + print(f" Other: {event_counter['other']}") + + print(f"\n📝 Финальное состояние:") + print(f" Status: {result.get('status')}") + print(f" Files: {len(result.get('files', []))}") + print(f" Comments: {len(result.get('comments', []))}") + print(f" Error: {result.get('error')}") + + except Exception as e: + print(f"\n❌ ОШИБКА при выполнении review:") + print(f" {e}") + import traceback + traceback.print_exc() + + +async def test_raw_graph_streaming(): + """Тест RAW стриминга напрямую из графа""" + print("\n" + "="*80) + print("ТЕСТ RAW СТРИМИНГА НАПРЯМУЮ ИЗ ГРАФА") + print("="*80) + + # Параметры для теста + TEST_REVIEW_ID = 1 + TEST_PR_NUMBER = 5 + TEST_REPOSITORY_ID = 1 + + async with AsyncSessionLocal() as db: + agent = ReviewerAgent(db) + + initial_state = { + "review_id": TEST_REVIEW_ID, + "pr_number": TEST_PR_NUMBER, + "repository_id": TEST_REPOSITORY_ID, + "status": "pending", + "files": [], + "analyzed_files": [], + "comments": [], + "error": None, + "git_service": None + } + + print(f"\n🔍 Тест 1: stream_mode=['updates']") + print("-" * 80) + + event_count = 0 + try: + async for event in agent.graph.astream(initial_state, stream_mode=["updates"]): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + print(f" Content: {event}") + + if event_count > 10: # Ограничение для безопасности + print("\n⚠️ Остановка после 10 событий") + break + except Exception as e: + print(f"\n❌ Ошибка: {e}") + import traceback + traceback.print_exc() + + print(f"\n✅ Всего событий 'updates': {event_count}") + + # Тест 2: messages + print(f"\n\n🔍 Тест 2: stream_mode=['messages']") + print("-" * 80) + + event_count = 0 + try: + # Создаем новый агент для чистого теста + agent2 = ReviewerAgent(db) + + async for event in agent2.graph.astream(initial_state, stream_mode=["messages"]): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + print(f" Content: {event}") + + if event_count > 10: + print("\n⚠️ Остановка после 10 событий") + break + except Exception as e: + print(f"\n❌ Ошибка: {e}") + import traceback + traceback.print_exc() + + print(f"\n✅ Всего событий 'messages': {event_count}") + + # Тест 3: updates + messages + print(f"\n\n🔍 Тест 3: stream_mode=['updates', 'messages']") + print("-" * 80) + + event_count = 0 + try: + agent3 = ReviewerAgent(db) + + async for event in agent3.graph.astream(initial_state, stream_mode=["updates", "messages"]): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + + # Детальный разбор события + if isinstance(event, dict): + print(f" Dict keys: {list(event.keys())}") + for key, value in event.items(): + print(f" {key}: {type(value).__name__}") + elif isinstance(event, tuple): + print(f" Tuple length: {len(event)}") + for i, item in enumerate(event): + print(f" [{i}]: {type(item).__name__}") + else: + print(f" Content preview: {str(event)[:200]}") + + if event_count > 10: + print("\n⚠️ Остановка после 10 событий") + break + except Exception as e: + print(f"\n❌ Ошибка: {e}") + import traceback + traceback.print_exc() + + print(f"\n✅ Всего событий 'updates + messages': {event_count}") + + +async def main(): + """Главная функция""" + import sys + + print("\n" + "🔬"*40) + print("ТЕСТИРОВАНИЕ СОБЫТИЙ LANGGRAPH") + print("🔬"*40 + "\n") + + print("Выберите тест:") + print("1. Полный review с callback (test_streaming)") + print("2. RAW стриминг напрямую из графа (test_raw_graph_streaming)") + print("3. Оба теста") + + choice = input("\nВведите номер теста (1/2/3) [по умолчанию: 3]: ").strip() or "3" + + if choice in ["1", "3"]: + print("\n" + "▶️"*40) + print("ЗАПУСК ТЕСТА 1: Полный review") + print("▶️"*40 + "\n") + await test_streaming() + + if choice in ["2", "3"]: + print("\n" + "▶️"*40) + print("ЗАПУСК ТЕСТА 2: RAW стриминг") + print("▶️"*40 + "\n") + await test_raw_graph_streaming() + + print("\n" + "✅"*40) + print("ВСЕ ТЕСТЫ ЗАВЕРШЕНЫ") + print("✅"*40 + "\n") + + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/test_simple_graph.py b/test_simple_graph.py new file mode 100644 index 0000000..6167ace --- /dev/null +++ b/test_simple_graph.py @@ -0,0 +1,228 @@ +""" +Упрощенный тест LangGraph без реального review +Проверяет только механизм стриминга событий +""" + +import asyncio +from langgraph.graph import StateGraph, END +from typing import TypedDict, Annotated, Any +import operator + + +# Простое состояние для теста +class SimpleState(TypedDict): + counter: Annotated[int, operator.add] + messages: list[str] + step: str + + +# Простые ноды +async def node_1(state: SimpleState) -> SimpleState: + """Первая нода""" + print(" [NODE 1] Выполняется...") + await asyncio.sleep(0.5) + return { + "counter": 1, + "messages": ["Node 1 completed"], + "step": "node_1" + } + + +async def node_2(state: SimpleState) -> SimpleState: + """Вторая нода""" + print(" [NODE 2] Выполняется...") + await asyncio.sleep(0.5) + return { + "counter": 1, + "messages": ["Node 2 completed"], + "step": "node_2" + } + + +async def node_3(state: SimpleState) -> SimpleState: + """Третья нода""" + print(" [NODE 3] Выполняется...") + await asyncio.sleep(0.5) + return { + "counter": 1, + "messages": ["Node 3 completed"], + "step": "node_3" + } + + +def create_test_graph(): + """Создает тестовый граф""" + workflow = StateGraph(SimpleState) + + # Добавляем ноды + workflow.add_node("node_1", node_1) + workflow.add_node("node_2", node_2) + workflow.add_node("node_3", node_3) + + # Определяем связи + workflow.set_entry_point("node_1") + workflow.add_edge("node_1", "node_2") + workflow.add_edge("node_2", "node_3") + workflow.add_edge("node_3", END) + + return workflow.compile() + + +async def test_stream_modes(): + """Тестирует разные режимы стриминга""" + graph = create_test_graph() + + initial_state: SimpleState = { + "counter": 0, + "messages": [], + "step": "start" + } + + # Тест 1: updates + print("\n" + "="*80) + print("ТЕСТ 1: stream_mode=['updates']") + print("="*80) + + event_count = 0 + async for event in graph.astream(initial_state, stream_mode=["updates"]): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + print(f" Content: {event}") + + print(f"\n✅ Получено событий: {event_count}") + + # Тест 2: messages + print("\n" + "="*80) + print("ТЕСТ 2: stream_mode=['messages']") + print("="*80) + + event_count = 0 + async for event in graph.astream(initial_state, stream_mode=["messages"]): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + print(f" Content: {event}") + + print(f"\n✅ Получено событий: {event_count}") + + # Тест 3: updates + messages + print("\n" + "="*80) + print("ТЕСТ 3: stream_mode=['updates', 'messages']") + print("="*80) + + event_count = 0 + async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + + if isinstance(event, dict): + print(f" Keys: {list(event.keys())}") + elif isinstance(event, tuple): + print(f" Tuple[0] type: {type(event[0])}") + print(f" Tuple[1] type: {type(event[1])}") + + print(f" Content: {event}") + + print(f"\n✅ Получено событий: {event_count}") + + # Тест 4: values + print("\n" + "="*80) + print("ТЕСТ 4: stream_mode=['values']") + print("="*80) + + event_count = 0 + async for event in graph.astream(initial_state, stream_mode=["values"]): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + print(f" Content: {event}") + + print(f"\n✅ Получено событий: {event_count}") + + # Тест 5: debug (все режимы) + print("\n" + "="*80) + print("ТЕСТ 5: stream_mode=['updates', 'messages', 'values', 'debug']") + print("="*80) + + event_count = 0 + async for event in graph.astream( + initial_state, + stream_mode=["updates", "messages", "values", "debug"] + ): + event_count += 1 + print(f"\n📨 Event #{event_count}") + print(f" Type: {type(event)}") + + if isinstance(event, tuple) and len(event) >= 2: + print(f" Event type (tuple[0]): {event[0]}") + print(f" Event data (tuple[1]): {event[1]}") + else: + print(f" Content: {event}") + + print(f"\n✅ Получено событий: {event_count}") + + +async def test_with_callback(): + """Тест с использованием callback для обработки событий""" + print("\n" + "="*80) + print("ТЕСТ 6: Callback обработка событий") + print("="*80) + + graph = create_test_graph() + + initial_state: SimpleState = { + "counter": 0, + "messages": [], + "step": "start" + } + + collected_events = [] + + async def event_callback(event_type: str, event_data: Any): + """Callback для обработки событий""" + collected_events.append({ + "type": event_type, + "data": event_data + }) + print(f" 🔔 Callback: {event_type}") + + # Симуляция обработки событий с callback + event_count = 0 + async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]): + event_count += 1 + print(f"\n📨 Event #{event_count}: {type(event)}") + + # Обрабатываем событие + if isinstance(event, tuple) and len(event) >= 2: + await event_callback(str(event[0]), event[1]) + elif isinstance(event, dict): + for node_name, node_data in event.items(): + await event_callback(f"node_update_{node_name}", node_data) + else: + await event_callback("unknown", event) + + print(f"\n✅ Всего событий: {event_count}") + print(f"✅ Callback вызовов: {len(collected_events)}") + print(f"\n📋 Собранные события:") + for i, evt in enumerate(collected_events, 1): + print(f" {i}. {evt['type']}") + + +async def main(): + print("\n" + "="*80) + print("ПРОСТОЙ ТЕСТ LANGGRAPH STREAMING") + print("="*80 + "\n") + + await test_stream_modes() + await test_with_callback() + + print("\n" + "="*80) + print("ТЕСТИРОВАНИЕ ЗАВЕРШЕНО") + print("="*80 + "\n") + + +if __name__ == "__main__": + asyncio.run(main()) +