feat: Enhance review process with streaming events and detailed logging

This commit is contained in:
Primakov Alexandr Alexandrovich
2025-10-13 17:26:41 +03:00
parent a762d09b3b
commit 2f29ccff74
10 changed files with 309 additions and 205 deletions

View File

@@ -526,7 +526,11 @@ class ReviewerAgent:
on_event: callable = None
) -> Dict[str, Any]:
"""Run the review workflow with streaming events"""
print(f"\n{'='*80}")
print(f"🎬 Starting review stream for PR #{pr_number}")
print(f" Review ID: {review_id}")
print(f" Callback: {on_event is not None}")
print(f"{'='*80}\n")
# Store callback in instance for access in nodes
self._stream_callback = on_event
@@ -545,9 +549,10 @@ class ReviewerAgent:
final_state = None
event_count = 0
callback_count = 0
# Stream through the graph
print(f"📊 Starting graph stream with mode=['updates']")
print(f"📊 Starting graph.astream() with mode=['updates']\n")
try:
async for event in self.graph.astream(
@@ -555,33 +560,59 @@ class ReviewerAgent:
stream_mode=["updates"]
):
event_count += 1
print(f"📨 Event #{event_count} received from graph")
print(f" Type: {type(event)}")
print(f" Event content: {event}")
print(f"\n{''*80}")
print(f"📨 STREAM Event #{event_count}")
print(f" Type: {type(event).__name__}")
print(f" Is tuple: {isinstance(event, tuple)}")
print(f" Content: {event}")
print(f"{''*80}")
# LangGraph returns events as dict: {node_name: node_output}
if isinstance(event, dict):
for node_name, node_data in event.items():
print(f" 🔔 Node update: {node_name}")
print(f" 🔔 Node data type: {type(node_data)}")
if on_event:
print(f" 📤 Sending event to callback for node: {node_name}")
await on_event({
"type": "agent_step",
"step": node_name,
"message": f"Шаг: {node_name}",
"data": {
"status": node_data.get("status") if isinstance(node_data, dict) else None
}
})
# Store final state
if isinstance(node_data, dict):
final_state = node_data
# LangGraph returns events as tuple: ('updates', {node_name: node_output})
if isinstance(event, tuple) and len(event) == 2:
event_type, event_data = event[0], event[1]
print(f"✓ Tuple detected:")
print(f" [0] event_type: '{event_type}'")
print(f" [1] event_data type: {type(event_data).__name__}")
# Handle 'updates' events
if event_type == 'updates' and isinstance(event_data, dict):
print(f"✓ Updates event with dict data")
for node_name, node_state in event_data.items():
print(f"\n 🔔 Node: '{node_name}'")
print(f" State type: {type(node_state).__name__}")
if on_event:
callback_count += 1
print(f" 📤 Calling callback #{callback_count}...")
try:
await on_event({
"type": "agent_step",
"step": node_name,
"message": f"Шаг: {node_name}",
"data": {
"status": node_state.get("status") if isinstance(node_state, dict) else None
}
})
print(f" ✓ Callback executed successfully")
except Exception as e:
print(f" ❌ Callback error: {e}")
import traceback
traceback.print_exc()
else:
print(f" ⚠️ No callback set!")
# Store final state
if isinstance(node_state, dict):
final_state = node_state
else:
print(f" ⚠️ Not an 'updates' event or data is not dict")
print(f" event_type={event_type}, isinstance(event_data, dict)={isinstance(event_data, dict)}")
else:
print(f" ⚠️ Unexpected event format (not dict): {type(event)}")
print(f" ❌ NOT a tuple or wrong length!")
print(f" isinstance(event, tuple)={isinstance(event, tuple)}")
if isinstance(event, tuple):
print(f" len(event)={len(event)}")
except Exception as e:
print(f"❌ Error in graph streaming: {e}")

View File

@@ -131,9 +131,41 @@ async def get_review(
async def run_review_task(review_id: int, pr_number: int, repository_id: int, db: AsyncSession):
"""Background task to run review"""
"""Background task to run review with streaming"""
from app.main import manager
from datetime import datetime as dt
# Create event handler for streaming
async def on_review_event(event: dict):
"""Handle review events and broadcast to clients"""
try:
event_data = {
"type": event.get("type", "agent_update"),
"review_id": review_id,
"pr_number": pr_number,
"timestamp": dt.utcnow().isoformat(),
"data": event
}
# Save to DB
from app.models.review_event import ReviewEvent
db_event = ReviewEvent(
review_id=review_id,
event_type=event.get("type", "agent_update"),
step=event.get("step"),
message=event.get("message"),
data=event
)
db.add(db_event)
await db.commit()
# Broadcast
await manager.broadcast(event_data)
except Exception as e:
print(f"Error in review event handler: {e}")
agent = ReviewerAgent(db)
await agent.run_review(review_id, pr_number, repository_id)
await agent.run_review_stream(review_id, pr_number, repository_id, on_event=on_review_event)
@router.post("/{review_id}/retry")

View File

@@ -13,11 +13,43 @@ router = APIRouter()
async def start_review_task(review_id: int, pr_number: int, repository_id: int):
"""Background task to start review"""
"""Background task to start review with streaming"""
from app.database import async_session_maker
from app.main import manager
from datetime import datetime as dt
async with async_session_maker() as db:
# Create event handler for streaming
async def on_review_event(event: dict):
"""Handle review events and broadcast to clients"""
try:
event_data = {
"type": event.get("type", "agent_update"),
"review_id": review_id,
"pr_number": pr_number,
"timestamp": dt.utcnow().isoformat(),
"data": event
}
# Save to DB
from app.models.review_event import ReviewEvent
db_event = ReviewEvent(
review_id=review_id,
event_type=event.get("type", "agent_update"),
step=event.get("step"),
message=event.get("message"),
data=event
)
db.add(db_event)
await db.commit()
# Broadcast
await manager.broadcast(event_data)
except Exception as e:
print(f"Error in webhook review event handler: {e}")
agent = ReviewerAgent(db)
await agent.run_review(review_id, pr_number, repository_id)
await agent.run_review_stream(review_id, pr_number, repository_id, on_event=on_review_event)
@router.post("/gitea/{repository_id}")

View File

@@ -29,11 +29,23 @@ class ConnectionManager:
async def broadcast(self, message: dict):
"""Broadcast message to all connected clients"""
for connection in self.active_connections:
print(f"\n[BROADCAST] Sending to {len(self.active_connections)} clients")
print(f"[BROADCAST] Message type: {message.get('type')}")
print(f"[BROADCAST] Message: {str(message)[:200]}...")
sent_count = 0
error_count = 0
for i, connection in enumerate(self.active_connections):
try:
await connection.send_json(message)
except Exception:
pass
sent_count += 1
print(f"[BROADCAST] ✓ Sent to client #{i+1}")
except Exception as e:
error_count += 1
print(f"[BROADCAST] ✗ Failed to send to client #{i+1}: {e}")
print(f"[BROADCAST] Result: {sent_count} sent, {error_count} failed")
# Create connection manager

View File

@@ -206,6 +206,14 @@ class ReviewTaskWorker:
# Create event handler
async def on_review_event(event: dict):
"""Handle review events and broadcast to clients"""
print(f"\n{'*'*80}")
print(f"CALLBACK INVOKED!")
print(f" Event type: {event.get('type')}")
print(f" Event step: {event.get('step')}")
print(f" Event message: {event.get('message')}")
print(f" Active WS connections: {len(manager.active_connections)}")
print(f"{'*'*80}")
try:
# Prepare event data
event_data = {
@@ -216,6 +224,7 @@ class ReviewTaskWorker:
"data": event
}
print(f" Prepared event_data: {event_data}")
logger.info(f" 🔔 Broadcasting event: type={event.get('type')}, connections={len(manager.active_connections)}")
# Save event to database
@@ -229,10 +238,13 @@ class ReviewTaskWorker:
)
db.add(db_event)
await db.commit()
print(f" ✓ Event saved to DB: {db_event.id}")
logger.debug(f" 💾 Event saved to DB: {db_event.id}")
# Broadcast to all connected clients
print(f" Broadcasting to {len(manager.active_connections)} connections...")
await manager.broadcast(event_data)
print(f" ✓ Broadcast completed")
# Log the event
if event.get("type") == "agent_step":
@@ -242,6 +254,7 @@ class ReviewTaskWorker:
message = event.get("message", "")[:100]
logger.info(f" 💬 LLM: {message}...")
except Exception as e:
print(f" ❌ ERROR in callback: {e}")
logger.error(f" ❌ Ошибка broadcast события: {e}")
import traceback
traceback.print_exc()

View File

@@ -0,0 +1 @@