code-review-agent/tests/test_langgraph_events.py

230 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Тестовый скрипт для проверки событий LangGraph
Запустить: python test_langgraph_events.py
"""
import asyncio
from backend.app.database import AsyncSessionLocal
from backend.app.agents.reviewer import ReviewerAgent
async def test_streaming():
"""Тест стриминга событий от LangGraph"""
print("="*80)
print("ТЕСТ СТРИМИНГА СОБЫТИЙ LANGGRAPH")
print("="*80)
# Параметры для теста (замените на реальные значения из вашей БД)
TEST_REVIEW_ID = 1
TEST_PR_NUMBER = 5
TEST_REPOSITORY_ID = 1
print(f"\n📋 Параметры теста:")
print(f" Review ID: {TEST_REVIEW_ID}")
print(f" PR Number: {TEST_PR_NUMBER}")
print(f" Repository ID: {TEST_REPOSITORY_ID}")
# Создаем сессию БД
async with AsyncSessionLocal() as db:
print(f"\n✅ Подключение к БД установлено")
# Создаем агента
agent = ReviewerAgent(db)
print(f"✅ ReviewerAgent создан")
# Счетчик событий
event_counter = {
'total': 0,
'updates': 0,
'messages': 0,
'other': 0
}
# Callback для событий
async def on_event(event: dict):
event_counter['total'] += 1
event_type = event.get('type', 'unknown')
print(f"\n{'='*80}")
print(f"📨 СОБЫТИЕ #{event_counter['total']} - Тип: {event_type}")
print(f"{'='*80}")
print(f"Полное событие: {event}")
print(f"{'='*80}\n")
print(f"\n🚀 Запуск review с стримингом...\n")
try:
# Запускаем review
result = await agent.run_review_stream(
review_id=TEST_REVIEW_ID,
pr_number=TEST_PR_NUMBER,
repository_id=TEST_REPOSITORY_ID,
on_event=on_event
)
print(f"\n{'='*80}")
print(f"✅ REVIEW ЗАВЕРШЕН")
print(f"{'='*80}")
print(f"\n📊 Статистика событий:")
print(f" Всего событий: {event_counter['total']}")
print(f" Updates: {event_counter['updates']}")
print(f" Messages: {event_counter['messages']}")
print(f" Other: {event_counter['other']}")
print(f"\n📝 Финальное состояние:")
print(f" Status: {result.get('status')}")
print(f" Files: {len(result.get('files', []))}")
print(f" Comments: {len(result.get('comments', []))}")
print(f" Error: {result.get('error')}")
except Exception as e:
print(f"\n❌ ОШИБКА при выполнении review:")
print(f" {e}")
import traceback
traceback.print_exc()
async def test_raw_graph_streaming():
"""Тест RAW стриминга напрямую из графа"""
print("\n" + "="*80)
print("ТЕСТ RAW СТРИМИНГА НАПРЯМУЮ ИЗ ГРАФА")
print("="*80)
# Параметры для теста
TEST_REVIEW_ID = 1
TEST_PR_NUMBER = 5
TEST_REPOSITORY_ID = 1
async with AsyncSessionLocal() as db:
agent = ReviewerAgent(db)
initial_state = {
"review_id": TEST_REVIEW_ID,
"pr_number": TEST_PR_NUMBER,
"repository_id": TEST_REPOSITORY_ID,
"status": "pending",
"files": [],
"analyzed_files": [],
"comments": [],
"error": None,
"git_service": None
}
print(f"\n🔍 Тест 1: stream_mode=['updates']")
print("-" * 80)
event_count = 0
try:
async for event in agent.graph.astream(initial_state, stream_mode=["updates"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
if event_count > 10: # Ограничение для безопасности
print("\n⚠️ Остановка после 10 событий")
break
except Exception as e:
print(f"\n❌ Ошибка: {e}")
import traceback
traceback.print_exc()
print(f"\nВсего событий 'updates': {event_count}")
# Тест 2: messages
print(f"\n\n🔍 Тест 2: stream_mode=['messages']")
print("-" * 80)
event_count = 0
try:
# Создаем новый агент для чистого теста
agent2 = ReviewerAgent(db)
async for event in agent2.graph.astream(initial_state, stream_mode=["messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
if event_count > 10:
print("\n⚠️ Остановка после 10 событий")
break
except Exception as e:
print(f"\n❌ Ошибка: {e}")
import traceback
traceback.print_exc()
print(f"\nВсего событий 'messages': {event_count}")
# Тест 3: updates + messages
print(f"\n\n🔍 Тест 3: stream_mode=['updates', 'messages']")
print("-" * 80)
event_count = 0
try:
agent3 = ReviewerAgent(db)
async for event in agent3.graph.astream(initial_state, stream_mode=["updates", "messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
# Детальный разбор события
if isinstance(event, dict):
print(f" Dict keys: {list(event.keys())}")
for key, value in event.items():
print(f" {key}: {type(value).__name__}")
elif isinstance(event, tuple):
print(f" Tuple length: {len(event)}")
for i, item in enumerate(event):
print(f" [{i}]: {type(item).__name__}")
else:
print(f" Content preview: {str(event)[:200]}")
if event_count > 10:
print("\n⚠️ Остановка после 10 событий")
break
except Exception as e:
print(f"\n❌ Ошибка: {e}")
import traceback
traceback.print_exc()
print(f"\nВсего событий 'updates + messages': {event_count}")
async def main():
"""Главная функция"""
import sys
print("\n" + "🔬"*40)
print("ТЕСТИРОВАНИЕ СОБЫТИЙ LANGGRAPH")
print("🔬"*40 + "\n")
print("Выберите тест:")
print("1. Полный review с callback (test_streaming)")
print("2. RAW стриминг напрямую из графа (test_raw_graph_streaming)")
print("3. Оба теста")
choice = input("\nВведите номер теста (1/2/3) [по умолчанию: 3]: ").strip() or "3"
if choice in ["1", "3"]:
print("\n" + "▶️"*40)
print("ЗАПУСК ТЕСТА 1: Полный review")
print("▶️"*40 + "\n")
await test_streaming()
if choice in ["2", "3"]:
print("\n" + "▶️"*40)
print("ЗАПУСК ТЕСТА 2: RAW стриминг")
print("▶️"*40 + "\n")
await test_raw_graph_streaming()
print("\n" + ""*40)
print("ВСЕ ТЕСТЫ ЗАВЕРШЕНЫ")
print(""*40 + "\n")
if __name__ == "__main__":
asyncio.run(main())