From a27a0fa0f049ce3314564a95a30dfcd1b299d41d Mon Sep 17 00:00:00 2001 From: Primakov Alexandr Alexandrovich Date: Mon, 13 Oct 2025 10:30:56 +0300 Subject: [PATCH] feat: Add WebSocket ping/pong + detailed streaming debug + initial review messages --- backend/app/main.py | 29 ++++- backend/app/workers/task_worker.py | 40 +++++- frontend/src/components/ReviewStream.tsx | 147 ++++++++++++++++------- 3 files changed, 169 insertions(+), 47 deletions(-) diff --git a/backend/app/main.py b/backend/app/main.py index bd2c8c6..d7cac27 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -122,14 +122,37 @@ async def health_check(): async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time review updates""" await manager.connect(websocket) + print(f"✅ WebSocket connected. Total connections: {len(manager.active_connections)}") + try: + # Send welcome message + await websocket.send_json({ + "type": "connection", + "status": "connected", + "message": "WebSocket подключен к серверу review", + "timestamp": __import__('datetime').datetime.utcnow().isoformat() + }) + while True: - # Keep connection alive + # Keep connection alive and handle client messages data = await websocket.receive_text() - # Echo back or handle client messages if needed - await websocket.send_json({"type": "pong", "message": "connected"}) + print(f"📨 Received from client: {data}") + + # Handle ping/pong + if data == "ping": + await websocket.send_json({ + "type": "pong", + "timestamp": __import__('datetime').datetime.utcnow().isoformat() + }) + else: + # Echo back for debugging + await websocket.send_json({ + "type": "echo", + "message": f"Получено: {data}" + }) except WebSocketDisconnect: manager.disconnect(websocket) + print(f"❌ WebSocket disconnected. Remaining connections: {len(manager.active_connections)}") async def broadcast_review_update(review_id: int, event_type: str, data: dict = None): diff --git a/backend/app/workers/task_worker.py b/backend/app/workers/task_worker.py index f64693a..aac578c 100644 --- a/backend/app/workers/task_worker.py +++ b/backend/app/workers/task_worker.py @@ -166,6 +166,25 @@ class ReviewTaskWorker: from app.main import manager from datetime import datetime as dt + # Send initial "review started" message + logger.info(f" 📢 Отправка начального сообщения о старте review...") + try: + initial_message = { + "type": "review_started", + "review_id": review.id, + "pr_number": pull_request.pr_number, + "timestamp": dt.utcnow().isoformat(), + "data": { + "message": f"Начало review для PR #{pull_request.pr_number}", + "repository_id": repository.id, + "repository_name": f"{repository.repo_owner}/{repository.repo_name}" + } + } + await manager.broadcast(initial_message) + logger.info(f" ✅ Начальное сообщение отправлено: {len(manager.active_connections)} подключений") + except Exception as e: + logger.error(f" ❌ Ошибка отправки начального сообщения: {e}") + # Create event handler async def on_review_event(event: dict): """Handle review events and broadcast to clients""" @@ -179,6 +198,8 @@ class ReviewTaskWorker: "data": event } + logger.info(f" 🔔 Broadcasting event: type={event.get('type')}, connections={len(manager.active_connections)}") + # Broadcast to all connected clients await manager.broadcast(event_data) @@ -188,9 +209,11 @@ class ReviewTaskWorker: logger.info(f" 📍 Step: {step}") elif event.get("type") == "llm_message": message = event.get("message", "")[:100] - logger.debug(f" 💬 LLM: {message}...") + logger.info(f" 💬 LLM: {message}...") except Exception as e: logger.error(f" ❌ Ошибка broadcast события: {e}") + import traceback + traceback.print_exc() agent = ReviewerAgent(db) await agent.run_review_stream( @@ -201,6 +224,21 @@ class ReviewTaskWorker: ) logger.info(f" ✅ Review завершен для PR #{pull_request.pr_number}") + + # Send completion message + try: + completion_message = { + "type": "review_completed", + "review_id": review.id, + "pr_number": pull_request.pr_number, + "timestamp": dt.utcnow().isoformat(), + "data": { + "message": f"Review завершен для PR #{pull_request.pr_number}" + } + } + await manager.broadcast(completion_message) + except Exception as e: + logger.error(f" ❌ Ошибка отправки сообщения о завершении: {e}") # Global worker instance diff --git a/frontend/src/components/ReviewStream.tsx b/frontend/src/components/ReviewStream.tsx index ed727a1..8711294 100644 --- a/frontend/src/components/ReviewStream.tsx +++ b/frontend/src/components/ReviewStream.tsx @@ -37,46 +37,89 @@ export const ReviewStream: React.FC = ({ reviewId }) => { console.log('👀 Watching for review ID:', reviewId); const ws = new WebSocket(WS_URL); + let pingInterval: number; ws.onopen = () => { console.log('✅ WebSocket connected for streaming'); setIsConnected(true); + + // Start ping interval (every 30 seconds) + pingInterval = window.setInterval(() => { + if (ws.readyState === WebSocket.OPEN) { + console.log('🏓 Sending ping...'); + ws.send('ping'); + } + }, 30000); }; ws.onmessage = (event) => { try { console.log('📨 WebSocket message received:', event.data); - const data: StreamEvent = JSON.parse(event.data); + const data: any = JSON.parse(event.data); console.log('📦 Parsed event:', data); - console.log(`🔍 Event review_id: ${data.review_id}, Expected: ${reviewId}`); - // Filter events for this review - if (data.review_id === reviewId) { - console.log('✅ Event matches review ID, adding to events'); - setEvents((prev) => [...prev, data]); + // Handle different message types + if (data.type === 'connection') { + console.log('🔗 Connection confirmed:', data.message); + return; + } + + if (data.type === 'pong') { + console.log('🏓 Pong received'); + return; + } + + if (data.type === 'echo') { + console.log('📢 Echo:', data.message); + return; + } + + // Review events + if (data.review_id !== undefined) { + console.log(`🔍 Event review_id: ${data.review_id}, Expected: ${reviewId}, Type: ${data.type}`); + + // Filter events for this review OR show all for debugging + if (data.review_id === reviewId || true) { // Allow all for now for debugging + console.log(`✅ Processing event type: ${data.type}`); + setEvents((prev) => [...prev, data]); - // Update current step - if (data.data.type === 'agent_step') { - console.log('🚶 Agent step:', data.data.step); - setCurrentStep(data.data.step || ''); - } + // Update current step + if (data.type === 'agent_step' || data.data?.type === 'agent_step') { + const step = data.data?.step || data.step; + console.log('🚶 Agent step:', step); + setCurrentStep(step || ''); + } - // Collect LLM messages - if (data.data.type === 'llm_message') { - console.log('💬 LLM message:', data.data.message); - setLlmMessages((prev) => [...prev, data.data.message || '']); + // Collect LLM messages + if (data.type === 'llm_message' || data.data?.type === 'llm_message') { + const message = data.data?.message || data.message; + console.log('💬 LLM message:', message); + setLlmMessages((prev) => [...prev, message || '']); + } + + // Handle special events + if (data.type === 'review_started') { + console.log('🎬 Review started:', data.data?.message); + } + + if (data.type === 'review_completed') { + console.log('🎉 Review completed:', data.data?.message); + } + } else { + console.log('⏭️ Event is for different review, skipping'); } - } else { - console.log('⏭️ Event is for different review, skipping'); } } catch (error) { - console.error('❌ Error parsing WebSocket message:', error); + console.error('❌ Error parsing WebSocket message:', error, 'Data:', event.data); } }; ws.onclose = () => { console.log('🔌 WebSocket disconnected'); setIsConnected(false); + if (pingInterval) { + window.clearInterval(pingInterval); + } }; ws.onerror = (error) => { @@ -85,6 +128,9 @@ export const ReviewStream: React.FC = ({ reviewId }) => { return () => { console.log('🔌 Closing WebSocket'); + if (pingInterval) { + window.clearInterval(pingInterval); + } ws.close(); }; }, [reviewId]); @@ -142,38 +188,53 @@ export const ReviewStream: React.FC = ({ reviewId }) => { if (events.length === 0) { return (
- Ожидание событий... +
⏳ Ожидание событий...
+
+ {isConnected ? '✅ WebSocket подключен' : '❌ WebSocket отключен'} +
); } return (
- {events.map((event, index) => ( -
-
- - {new Date(event.timestamp).toLocaleTimeString()} - - - {event.data.type} - + {events.map((event: any, index) => { + const eventType = event.type || event.data?.type; + const eventMessage = event.data?.message || event.message; + const eventStep = event.data?.step || event.step; + + return ( +
+
+ + {new Date(event.timestamp).toLocaleTimeString('ru-RU')} + + + {eventType} + +
+ {eventStep && ( +
+ {getStepInfo(eventStep).icon} {getStepInfo(eventStep).name} +
+ )} + {eventMessage && ( +
+ {eventMessage} +
+ )}
- {event.data.step && ( -
- {getStepInfo(event.data.step).icon} {getStepInfo(event.data.step).name} -
- )} - {event.data.message && ( -
- {event.data.message} -
- )} -
- ))} + ); + })}
); };