diff --git a/backend/app/agents/reviewer.py b/backend/app/agents/reviewer.py index 14031d3..3b6053a 100644 --- a/backend/app/agents/reviewer.py +++ b/backend/app/agents/reviewer.py @@ -485,4 +485,56 @@ class ReviewerAgent: final_state = await self.graph.ainvoke(initial_state) return final_state + + async def run_review_stream( + self, + review_id: int, + pr_number: int, + repository_id: int, + on_event: callable = None + ) -> Dict[str, Any]: + """Run the review workflow with streaming events""" + initial_state: ReviewState = { + "review_id": review_id, + "pr_number": pr_number, + "repository_id": repository_id, + "status": "pending", + "files": [], + "analyzed_files": [], + "comments": [], + "error": None, + "git_service": None + } + + final_state = None + + # Stream through the graph + async for event in self.graph.astream( + initial_state, + stream_mode=["updates", "messages"] + ): + # Handle different event types + if isinstance(event, dict): + # Node updates + for node_name, node_data in event.items(): + if on_event: + await on_event({ + "type": "agent_step", + "step": node_name, + "data": node_data + }) + + # Store final state + if isinstance(node_data, dict): + final_state = node_data + + # Handle message events (LLM calls) + elif hasattr(event, '__class__') and 'message' in event.__class__.__name__.lower(): + if on_event: + await on_event({ + "type": "llm_message", + "message": str(event) + }) + + return final_state or initial_state diff --git a/backend/app/schemas/__init__.py b/backend/app/schemas/__init__.py index 5b049b6..3b459cf 100644 --- a/backend/app/schemas/__init__.py +++ b/backend/app/schemas/__init__.py @@ -16,6 +16,13 @@ from app.schemas.webhook import ( GitHubWebhook, BitbucketWebhook ) +from app.schemas.streaming import ( + StreamEvent, + AgentStepEvent, + LLMStreamEvent, + ReviewProgressEvent, + StreamEventType +) __all__ = [ "RepositoryCreate", @@ -28,5 +35,10 @@ __all__ = [ "GiteaWebhook", "GitHubWebhook", "BitbucketWebhook", + "StreamEvent", + "AgentStepEvent", + "LLMStreamEvent", + "ReviewProgressEvent", + "StreamEventType", ] diff --git a/backend/app/schemas/streaming.py b/backend/app/schemas/streaming.py new file mode 100644 index 0000000..df6cc2c --- /dev/null +++ b/backend/app/schemas/streaming.py @@ -0,0 +1,55 @@ +"""Streaming events schemas""" + +from typing import Optional, Any, Dict, Literal +from pydantic import BaseModel + + +class StreamEventType: + """Stream event types""" + AGENT_START = "agent_start" + AGENT_UPDATE = "agent_update" + AGENT_STEP = "agent_step" + LLM_START = "llm_start" + LLM_STREAM = "llm_stream" + LLM_END = "llm_end" + AGENT_ERROR = "agent_error" + AGENT_COMPLETE = "agent_complete" + + +class StreamEvent(BaseModel): + """Base streaming event""" + type: str + review_id: int + timestamp: str + data: Dict[str, Any] + + +class AgentStepEvent(BaseModel): + """Agent step event""" + type: Literal["agent_step"] = "agent_step" + review_id: int + step: str # fetch_pr_info, fetch_files, analyze_files, post_comments + status: str # started, completed, failed + message: str + data: Optional[Dict[str, Any]] = None + + +class LLMStreamEvent(BaseModel): + """LLM streaming event""" + type: Literal["llm_stream"] = "llm_stream" + review_id: int + file_path: Optional[str] = None + chunk: str # Часть ответа от LLM + is_complete: bool = False + + +class ReviewProgressEvent(BaseModel): + """Review progress event""" + type: Literal["review_progress"] = "review_progress" + review_id: int + total_files: int + analyzed_files: int + total_comments: int + current_step: str + message: str + diff --git a/backend/app/workers/task_worker.py b/backend/app/workers/task_worker.py index 109b8f4..f64693a 100644 --- a/backend/app/workers/task_worker.py +++ b/backend/app/workers/task_worker.py @@ -159,14 +159,45 @@ class ReviewTaskWorker: await db.commit() await db.refresh(review) - # Run review agent + # Run review agent with streaming logger.info(f" 🤖 Запуск AI review для PR #{pull_request.pr_number}") + # Import broadcast function + from app.main import manager + from datetime import datetime as dt + + # Create event handler + async def on_review_event(event: dict): + """Handle review events and broadcast to clients""" + try: + # Prepare event data + event_data = { + "type": event.get("type", "agent_update"), + "review_id": review.id, + "pr_number": pull_request.pr_number, + "timestamp": dt.utcnow().isoformat(), + "data": event + } + + # Broadcast to all connected clients + await manager.broadcast(event_data) + + # Log the event + if event.get("type") == "agent_step": + step = event.get("step", "unknown") + logger.info(f" 📍 Step: {step}") + elif event.get("type") == "llm_message": + message = event.get("message", "")[:100] + logger.debug(f" 💬 LLM: {message}...") + except Exception as e: + logger.error(f" ❌ Ошибка broadcast события: {e}") + agent = ReviewerAgent(db) - await agent.run_review( + await agent.run_review_stream( review_id=review.id, pr_number=pull_request.pr_number, - repository_id=repository.id + repository_id=repository.id, + on_event=on_review_event ) logger.info(f" ✅ Review завершен для PR #{pull_request.pr_number}") diff --git a/frontend/src/components/ReviewStream.tsx b/frontend/src/components/ReviewStream.tsx new file mode 100644 index 0000000..20c7347 --- /dev/null +++ b/frontend/src/components/ReviewStream.tsx @@ -0,0 +1,224 @@ +import React, { useEffect, useState } from 'react'; +import { WS_URL } from '../api/websocket'; + +interface StreamEvent { + type: string; + review_id: number; + pr_number: number; + timestamp: string; + data: { + type?: string; + step?: string; + message?: string; + data?: any; + }; +} + +interface ReviewStreamProps { + reviewId: number; +} + +const STEP_NAMES: Record = { + fetch_pr_info: { name: 'Получение информации о PR', icon: '📋' }, + fetch_files: { name: 'Загрузка файлов', icon: '📂' }, + analyze_files: { name: 'Анализ кода', icon: '🔍' }, + post_comments: { name: 'Публикация комментариев', icon: '💬' }, + complete_review: { name: 'Завершение review', icon: '✅' }, +}; + +export const ReviewStream: React.FC = ({ reviewId }) => { + const [events, setEvents] = useState([]); + const [currentStep, setCurrentStep] = useState(''); + const [isConnected, setIsConnected] = useState(false); + const [llmMessages, setLlmMessages] = useState([]); + + useEffect(() => { + const ws = new WebSocket(WS_URL); + + ws.onopen = () => { + console.log('WebSocket connected for streaming'); + setIsConnected(true); + }; + + ws.onmessage = (event) => { + try { + const data: StreamEvent = JSON.parse(event.data); + + // Filter events for this review + if (data.review_id === reviewId) { + setEvents((prev) => [...prev, data]); + + // Update current step + if (data.data.type === 'agent_step') { + setCurrentStep(data.data.step || ''); + } + + // Collect LLM messages + if (data.data.type === 'llm_message') { + setLlmMessages((prev) => [...prev, data.data.message || '']); + } + } + } catch (error) { + console.error('Error parsing WebSocket message:', error); + } + }; + + ws.onclose = () => { + console.log('WebSocket disconnected'); + setIsConnected(false); + }; + + ws.onerror = (error) => { + console.error('WebSocket error:', error); + }; + + return () => { + ws.close(); + }; + }, [reviewId]); + + const getStepInfo = (step: string) => { + return STEP_NAMES[step] || { name: step, icon: '⚙️' }; + }; + + const renderStepProgress = () => { + const steps = Object.keys(STEP_NAMES); + const currentIndex = steps.indexOf(currentStep); + + return ( +
+
+ {steps.map((step, index) => { + const stepInfo = getStepInfo(step); + const isActive = index === currentIndex; + const isCompleted = index < currentIndex; + + return ( +
+
+ {stepInfo.icon} +
+ {index < steps.length - 1 && ( +
+ )} +
+ ); + })} +
+ {currentStep && ( +
+ {getStepInfo(currentStep).name} +
+ )} +
+ ); + }; + + const renderEvents = () => { + if (events.length === 0) { + return ( +
+ Ожидание событий... +
+ ); + } + + return ( +
+ {events.map((event, index) => ( +
+
+ + {new Date(event.timestamp).toLocaleTimeString()} + + + {event.data.type} + +
+ {event.data.step && ( +
+ {getStepInfo(event.data.step).icon} {getStepInfo(event.data.step).name} +
+ )} + {event.data.message && ( +
+ {event.data.message} +
+ )} +
+ ))} +
+ ); + }; + + const renderLLMMessages = () => { + if (llmMessages.length === 0) return null; + + return ( +
+

+ 🤖 Ответы LLM +

+
+ {llmMessages.slice(-5).map((message, index) => ( +
+ {message} +
+ ))} +
+
+ ); + }; + + return ( +
+
+

+ 🔄 Процесс Review +

+
+ + {isConnected ? 'Подключено' : 'Отключено'} +
+
+ + {renderStepProgress()} + +
+

+ 📝 События +

+ {renderEvents()} +
+ + {renderLLMMessages()} +
+ ); +}; + diff --git a/frontend/src/pages/ReviewDetail.tsx b/frontend/src/pages/ReviewDetail.tsx index 056d222..18580d1 100644 --- a/frontend/src/pages/ReviewDetail.tsx +++ b/frontend/src/pages/ReviewDetail.tsx @@ -5,6 +5,7 @@ import { getReview, retryReview } from '../api/client'; import { wsClient } from '../api/websocket'; import ReviewProgress from '../components/ReviewProgress'; import CommentsList from '../components/CommentsList'; +import { ReviewStream } from '../components/ReviewStream'; import { formatDistance } from 'date-fns'; import { ru } from 'date-fns/locale'; @@ -127,6 +128,11 @@ export default function ReviewDetail() {
+ {/* Real-time Review Stream (only during review) */} + {['pending', 'fetching', 'analyzing', 'commenting'].includes(review.status) && ( + + )} + {/* Progress */}

Прогресс