From c9dc4860115c76689749ca60cb5bb5207fe2b383 Mon Sep 17 00:00:00 2001 From: Primakov Alexandr Alexandrovich Date: Mon, 13 Oct 2025 12:58:58 +0300 Subject: [PATCH] fix: Add manual step events in each graph node + detailed streaming debug --- backend/app/agents/reviewer.py | 104 ++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 26 deletions(-) diff --git a/backend/app/agents/reviewer.py b/backend/app/agents/reviewer.py index 3b6053a..29eeadc 100644 --- a/backend/app/agents/reviewer.py +++ b/backend/app/agents/reviewer.py @@ -142,6 +142,14 @@ class ReviewerAgent: async def fetch_pr_info(self, state: ReviewState) -> ReviewState: """Fetch PR information""" + # Send step event + if hasattr(self, '_stream_callback') and self._stream_callback: + await self._stream_callback({ + "type": "agent_step", + "step": "fetch_pr_info", + "message": "Получение информации о PR..." + }) + try: # Update review status result = await self.db.execute( @@ -198,6 +206,14 @@ class ReviewerAgent: async def fetch_files(self, state: ReviewState) -> ReviewState: """Fetch changed files in PR""" + # Send step event + if hasattr(self, '_stream_callback') and self._stream_callback: + await self._stream_callback({ + "type": "agent_step", + "step": "fetch_files", + "message": "Загрузка измененных файлов..." + }) + try: git_service = state["git_service"] @@ -269,6 +285,14 @@ class ReviewerAgent: async def analyze_files(self, state: ReviewState) -> ReviewState: """Analyze files and generate comments""" + # Send step event + if hasattr(self, '_stream_callback') and self._stream_callback: + await self._stream_callback({ + "type": "agent_step", + "step": "analyze_files", + "message": "Анализ кода с помощью AI..." + }) + try: all_comments = [] @@ -335,6 +359,14 @@ class ReviewerAgent: async def post_comments(self, state: ReviewState) -> ReviewState: """Post comments to PR""" + # Send step event + if hasattr(self, '_stream_callback') and self._stream_callback: + await self._stream_callback({ + "type": "agent_step", + "step": "post_comments", + "message": "Публикация комментариев в PR..." + }) + try: # Save comments to database result = await self.db.execute( @@ -494,6 +526,11 @@ class ReviewerAgent: on_event: callable = None ) -> Dict[str, Any]: """Run the review workflow with streaming events""" + print(f"🎬 Starting review stream for PR #{pr_number}") + + # Store callback in instance for access in nodes + self._stream_callback = on_event + initial_state: ReviewState = { "review_id": review_id, "pr_number": pr_number, @@ -507,34 +544,49 @@ class ReviewerAgent: } final_state = None + event_count = 0 # Stream through the graph - async for event in self.graph.astream( - initial_state, - stream_mode=["updates", "messages"] - ): - # Handle different event types - if isinstance(event, dict): - # Node updates - for node_name, node_data in event.items(): - if on_event: - await on_event({ - "type": "agent_step", - "step": node_name, - "data": node_data - }) - - # Store final state - if isinstance(node_data, dict): - final_state = node_data - - # Handle message events (LLM calls) - elif hasattr(event, '__class__') and 'message' in event.__class__.__name__.lower(): - if on_event: - await on_event({ - "type": "llm_message", - "message": str(event) - }) + print(f"📊 Starting graph stream with mode=['updates']") + + try: + async for event in self.graph.astream( + initial_state, + stream_mode=["updates"] + ): + event_count += 1 + print(f"📨 Event #{event_count} received from graph: {type(event)}") + print(f" Event content: {event if not isinstance(event, dict) or len(str(event)) < 200 else 'dict with keys: ' + str(list(event.keys()))}") + + # Handle different event types + if isinstance(event, dict): + # Node updates + for node_name, node_data in event.items(): + print(f" 🔔 Node update: {node_name}") + + if on_event: + print(f" 📤 Sending event to callback for node: {node_name}") + await on_event({ + "type": "agent_step", + "step": node_name, + "data": node_data if not isinstance(node_data, dict) else {"status": "processing"} + }) + + # Store final state + if isinstance(node_data, dict): + final_state = node_data + else: + print(f" ⚠️ Unexpected event type: {type(event)}") + + except Exception as e: + print(f"❌ Error in graph streaming: {e}") + import traceback + traceback.print_exc() + + print(f"✅ Graph streaming completed. Total events: {event_count}") + + # Clear callback + self._stream_callback = None return final_state or initial_state