feat: Add WebSocket ping/pong + detailed streaming debug + initial review messages
This commit is contained in:
parent
3df9e61b55
commit
a27a0fa0f0
@ -122,14 +122,37 @@ async def health_check():
|
|||||||
async def websocket_endpoint(websocket: WebSocket):
|
async def websocket_endpoint(websocket: WebSocket):
|
||||||
"""WebSocket endpoint for real-time review updates"""
|
"""WebSocket endpoint for real-time review updates"""
|
||||||
await manager.connect(websocket)
|
await manager.connect(websocket)
|
||||||
|
print(f"✅ WebSocket connected. Total connections: {len(manager.active_connections)}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Send welcome message
|
||||||
|
await websocket.send_json({
|
||||||
|
"type": "connection",
|
||||||
|
"status": "connected",
|
||||||
|
"message": "WebSocket подключен к серверу review",
|
||||||
|
"timestamp": __import__('datetime').datetime.utcnow().isoformat()
|
||||||
|
})
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Keep connection alive
|
# Keep connection alive and handle client messages
|
||||||
data = await websocket.receive_text()
|
data = await websocket.receive_text()
|
||||||
# Echo back or handle client messages if needed
|
print(f"📨 Received from client: {data}")
|
||||||
await websocket.send_json({"type": "pong", "message": "connected"})
|
|
||||||
|
# Handle ping/pong
|
||||||
|
if data == "ping":
|
||||||
|
await websocket.send_json({
|
||||||
|
"type": "pong",
|
||||||
|
"timestamp": __import__('datetime').datetime.utcnow().isoformat()
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
# Echo back for debugging
|
||||||
|
await websocket.send_json({
|
||||||
|
"type": "echo",
|
||||||
|
"message": f"Получено: {data}"
|
||||||
|
})
|
||||||
except WebSocketDisconnect:
|
except WebSocketDisconnect:
|
||||||
manager.disconnect(websocket)
|
manager.disconnect(websocket)
|
||||||
|
print(f"❌ WebSocket disconnected. Remaining connections: {len(manager.active_connections)}")
|
||||||
|
|
||||||
|
|
||||||
async def broadcast_review_update(review_id: int, event_type: str, data: dict = None):
|
async def broadcast_review_update(review_id: int, event_type: str, data: dict = None):
|
||||||
|
|||||||
@ -166,6 +166,25 @@ class ReviewTaskWorker:
|
|||||||
from app.main import manager
|
from app.main import manager
|
||||||
from datetime import datetime as dt
|
from datetime import datetime as dt
|
||||||
|
|
||||||
|
# Send initial "review started" message
|
||||||
|
logger.info(f" 📢 Отправка начального сообщения о старте review...")
|
||||||
|
try:
|
||||||
|
initial_message = {
|
||||||
|
"type": "review_started",
|
||||||
|
"review_id": review.id,
|
||||||
|
"pr_number": pull_request.pr_number,
|
||||||
|
"timestamp": dt.utcnow().isoformat(),
|
||||||
|
"data": {
|
||||||
|
"message": f"Начало review для PR #{pull_request.pr_number}",
|
||||||
|
"repository_id": repository.id,
|
||||||
|
"repository_name": f"{repository.repo_owner}/{repository.repo_name}"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await manager.broadcast(initial_message)
|
||||||
|
logger.info(f" ✅ Начальное сообщение отправлено: {len(manager.active_connections)} подключений")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f" ❌ Ошибка отправки начального сообщения: {e}")
|
||||||
|
|
||||||
# Create event handler
|
# Create event handler
|
||||||
async def on_review_event(event: dict):
|
async def on_review_event(event: dict):
|
||||||
"""Handle review events and broadcast to clients"""
|
"""Handle review events and broadcast to clients"""
|
||||||
@ -179,6 +198,8 @@ class ReviewTaskWorker:
|
|||||||
"data": event
|
"data": event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info(f" 🔔 Broadcasting event: type={event.get('type')}, connections={len(manager.active_connections)}")
|
||||||
|
|
||||||
# Broadcast to all connected clients
|
# Broadcast to all connected clients
|
||||||
await manager.broadcast(event_data)
|
await manager.broadcast(event_data)
|
||||||
|
|
||||||
@ -188,9 +209,11 @@ class ReviewTaskWorker:
|
|||||||
logger.info(f" 📍 Step: {step}")
|
logger.info(f" 📍 Step: {step}")
|
||||||
elif event.get("type") == "llm_message":
|
elif event.get("type") == "llm_message":
|
||||||
message = event.get("message", "")[:100]
|
message = event.get("message", "")[:100]
|
||||||
logger.debug(f" 💬 LLM: {message}...")
|
logger.info(f" 💬 LLM: {message}...")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f" ❌ Ошибка broadcast события: {e}")
|
logger.error(f" ❌ Ошибка broadcast события: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
agent = ReviewerAgent(db)
|
agent = ReviewerAgent(db)
|
||||||
await agent.run_review_stream(
|
await agent.run_review_stream(
|
||||||
@ -201,6 +224,21 @@ class ReviewTaskWorker:
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f" ✅ Review завершен для PR #{pull_request.pr_number}")
|
logger.info(f" ✅ Review завершен для PR #{pull_request.pr_number}")
|
||||||
|
|
||||||
|
# Send completion message
|
||||||
|
try:
|
||||||
|
completion_message = {
|
||||||
|
"type": "review_completed",
|
||||||
|
"review_id": review.id,
|
||||||
|
"pr_number": pull_request.pr_number,
|
||||||
|
"timestamp": dt.utcnow().isoformat(),
|
||||||
|
"data": {
|
||||||
|
"message": f"Review завершен для PR #{pull_request.pr_number}"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await manager.broadcast(completion_message)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f" ❌ Ошибка отправки сообщения о завершении: {e}")
|
||||||
|
|
||||||
|
|
||||||
# Global worker instance
|
# Global worker instance
|
||||||
|
|||||||
@ -37,46 +37,89 @@ export const ReviewStream: React.FC<ReviewStreamProps> = ({ reviewId }) => {
|
|||||||
console.log('👀 Watching for review ID:', reviewId);
|
console.log('👀 Watching for review ID:', reviewId);
|
||||||
|
|
||||||
const ws = new WebSocket(WS_URL);
|
const ws = new WebSocket(WS_URL);
|
||||||
|
let pingInterval: number;
|
||||||
|
|
||||||
ws.onopen = () => {
|
ws.onopen = () => {
|
||||||
console.log('✅ WebSocket connected for streaming');
|
console.log('✅ WebSocket connected for streaming');
|
||||||
setIsConnected(true);
|
setIsConnected(true);
|
||||||
|
|
||||||
|
// Start ping interval (every 30 seconds)
|
||||||
|
pingInterval = window.setInterval(() => {
|
||||||
|
if (ws.readyState === WebSocket.OPEN) {
|
||||||
|
console.log('🏓 Sending ping...');
|
||||||
|
ws.send('ping');
|
||||||
|
}
|
||||||
|
}, 30000);
|
||||||
};
|
};
|
||||||
|
|
||||||
ws.onmessage = (event) => {
|
ws.onmessage = (event) => {
|
||||||
try {
|
try {
|
||||||
console.log('📨 WebSocket message received:', event.data);
|
console.log('📨 WebSocket message received:', event.data);
|
||||||
const data: StreamEvent = JSON.parse(event.data);
|
const data: any = JSON.parse(event.data);
|
||||||
console.log('📦 Parsed event:', data);
|
console.log('📦 Parsed event:', data);
|
||||||
console.log(`🔍 Event review_id: ${data.review_id}, Expected: ${reviewId}`);
|
|
||||||
|
|
||||||
// Filter events for this review
|
// Handle different message types
|
||||||
if (data.review_id === reviewId) {
|
if (data.type === 'connection') {
|
||||||
console.log('✅ Event matches review ID, adding to events');
|
console.log('🔗 Connection confirmed:', data.message);
|
||||||
setEvents((prev) => [...prev, data]);
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.type === 'pong') {
|
||||||
|
console.log('🏓 Pong received');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.type === 'echo') {
|
||||||
|
console.log('📢 Echo:', data.message);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Review events
|
||||||
|
if (data.review_id !== undefined) {
|
||||||
|
console.log(`🔍 Event review_id: ${data.review_id}, Expected: ${reviewId}, Type: ${data.type}`);
|
||||||
|
|
||||||
|
// Filter events for this review OR show all for debugging
|
||||||
|
if (data.review_id === reviewId || true) { // Allow all for now for debugging
|
||||||
|
console.log(`✅ Processing event type: ${data.type}`);
|
||||||
|
setEvents((prev) => [...prev, data]);
|
||||||
|
|
||||||
// Update current step
|
// Update current step
|
||||||
if (data.data.type === 'agent_step') {
|
if (data.type === 'agent_step' || data.data?.type === 'agent_step') {
|
||||||
console.log('🚶 Agent step:', data.data.step);
|
const step = data.data?.step || data.step;
|
||||||
setCurrentStep(data.data.step || '');
|
console.log('🚶 Agent step:', step);
|
||||||
}
|
setCurrentStep(step || '');
|
||||||
|
}
|
||||||
|
|
||||||
// Collect LLM messages
|
// Collect LLM messages
|
||||||
if (data.data.type === 'llm_message') {
|
if (data.type === 'llm_message' || data.data?.type === 'llm_message') {
|
||||||
console.log('💬 LLM message:', data.data.message);
|
const message = data.data?.message || data.message;
|
||||||
setLlmMessages((prev) => [...prev, data.data.message || '']);
|
console.log('💬 LLM message:', message);
|
||||||
|
setLlmMessages((prev) => [...prev, message || '']);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle special events
|
||||||
|
if (data.type === 'review_started') {
|
||||||
|
console.log('🎬 Review started:', data.data?.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.type === 'review_completed') {
|
||||||
|
console.log('🎉 Review completed:', data.data?.message);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
console.log('⏭️ Event is for different review, skipping');
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
console.log('⏭️ Event is for different review, skipping');
|
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('❌ Error parsing WebSocket message:', error);
|
console.error('❌ Error parsing WebSocket message:', error, 'Data:', event.data);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ws.onclose = () => {
|
ws.onclose = () => {
|
||||||
console.log('🔌 WebSocket disconnected');
|
console.log('🔌 WebSocket disconnected');
|
||||||
setIsConnected(false);
|
setIsConnected(false);
|
||||||
|
if (pingInterval) {
|
||||||
|
window.clearInterval(pingInterval);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ws.onerror = (error) => {
|
ws.onerror = (error) => {
|
||||||
@ -85,6 +128,9 @@ export const ReviewStream: React.FC<ReviewStreamProps> = ({ reviewId }) => {
|
|||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
console.log('🔌 Closing WebSocket');
|
console.log('🔌 Closing WebSocket');
|
||||||
|
if (pingInterval) {
|
||||||
|
window.clearInterval(pingInterval);
|
||||||
|
}
|
||||||
ws.close();
|
ws.close();
|
||||||
};
|
};
|
||||||
}, [reviewId]);
|
}, [reviewId]);
|
||||||
@ -142,38 +188,53 @@ export const ReviewStream: React.FC<ReviewStreamProps> = ({ reviewId }) => {
|
|||||||
if (events.length === 0) {
|
if (events.length === 0) {
|
||||||
return (
|
return (
|
||||||
<div className="text-center text-dark-text-muted py-4">
|
<div className="text-center text-dark-text-muted py-4">
|
||||||
Ожидание событий...
|
<div className="animate-pulse">⏳ Ожидание событий...</div>
|
||||||
|
<div className="text-xs mt-2">
|
||||||
|
{isConnected ? '✅ WebSocket подключен' : '❌ WebSocket отключен'}
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="space-y-2 max-h-60 overflow-y-auto">
|
<div className="space-y-2 max-h-60 overflow-y-auto">
|
||||||
{events.map((event, index) => (
|
{events.map((event: any, index) => {
|
||||||
<div
|
const eventType = event.type || event.data?.type;
|
||||||
key={index}
|
const eventMessage = event.data?.message || event.message;
|
||||||
className="bg-dark-card border border-dark-border rounded-lg p-3 text-sm"
|
const eventStep = event.data?.step || event.step;
|
||||||
>
|
|
||||||
<div className="flex items-center justify-between mb-1">
|
return (
|
||||||
<span className="text-dark-text-secondary text-xs">
|
<div
|
||||||
{new Date(event.timestamp).toLocaleTimeString()}
|
key={index}
|
||||||
</span>
|
className="bg-dark-card border border-dark-border rounded-lg p-3 text-sm"
|
||||||
<span className="text-xs bg-blue-900/30 text-blue-400 px-2 py-1 rounded">
|
>
|
||||||
{event.data.type}
|
<div className="flex items-center justify-between mb-1">
|
||||||
</span>
|
<span className="text-dark-text-secondary text-xs">
|
||||||
|
{new Date(event.timestamp).toLocaleTimeString('ru-RU')}
|
||||||
|
</span>
|
||||||
|
<span className={`text-xs px-2 py-1 rounded ${
|
||||||
|
eventType === 'review_started' ? 'bg-green-900/30 text-green-400' :
|
||||||
|
eventType === 'review_completed' ? 'bg-blue-900/30 text-blue-400' :
|
||||||
|
eventType === 'agent_step' ? 'bg-purple-900/30 text-purple-400' :
|
||||||
|
eventType === 'llm_message' ? 'bg-yellow-900/30 text-yellow-400' :
|
||||||
|
'bg-gray-900/30 text-gray-400'
|
||||||
|
}`}>
|
||||||
|
{eventType}
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
{eventStep && (
|
||||||
|
<div className="text-dark-text-primary">
|
||||||
|
{getStepInfo(eventStep).icon} {getStepInfo(eventStep).name}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
{eventMessage && (
|
||||||
|
<div className="text-dark-text-muted mt-1 text-xs">
|
||||||
|
{eventMessage}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
</div>
|
</div>
|
||||||
{event.data.step && (
|
);
|
||||||
<div className="text-dark-text-primary">
|
})}
|
||||||
{getStepInfo(event.data.step).icon} {getStepInfo(event.data.step).name}
|
|
||||||
</div>
|
|
||||||
)}
|
|
||||||
{event.data.message && (
|
|
||||||
<div className="text-dark-text-muted mt-1 text-xs">
|
|
||||||
{event.data.message}
|
|
||||||
</div>
|
|
||||||
)}
|
|
||||||
</div>
|
|
||||||
))}
|
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user