"""Main reviewer agent using LangGraph""" from typing import TypedDict, List, Dict, Any, Optional from langgraph.graph import StateGraph, END from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.agents.tools import CodeAnalyzer, detect_language, should_review_file from app.agents.prompts import SYSTEM_PROMPT, SUMMARY_PROMPT from app.models import Review, Comment, PullRequest, Repository from app.models.review import ReviewStatusEnum from app.models.comment import SeverityEnum from app.services import GiteaService, GitHubService, BitbucketService from app.services.base import BaseGitService from app.config import settings class ReviewState(TypedDict): """State for the review workflow""" review_id: int pr_number: int repository_id: int status: str files: List[Dict[str, Any]] analyzed_files: List[str] comments: List[Dict[str, Any]] error: Optional[str] git_service: Optional[BaseGitService] class ReviewerAgent: """Agent for reviewing code using LangGraph""" def __init__(self, db: AsyncSession): self.db = db self.analyzer = CodeAnalyzer( ollama_base_url=settings.ollama_base_url, model=settings.ollama_model ) self.graph = self._build_graph() def _build_graph(self) -> StateGraph: """Build the LangGraph workflow""" workflow = StateGraph(ReviewState) # Add nodes workflow.add_node("fetch_pr_info", self.fetch_pr_info) workflow.add_node("fetch_files", self.fetch_files) workflow.add_node("analyze_files", self.analyze_files) workflow.add_node("post_comments", self.post_comments) workflow.add_node("complete_review", self.complete_review) # Set entry point workflow.set_entry_point("fetch_pr_info") # Add edges workflow.add_edge("fetch_pr_info", "fetch_files") workflow.add_edge("fetch_files", "analyze_files") workflow.add_edge("analyze_files", "post_comments") workflow.add_edge("post_comments", "complete_review") workflow.add_edge("complete_review", END) return workflow.compile() def _remove_think_blocks(self, text: str) -> str: """Remove ... blocks from text""" import re # Remove blocks text = re.sub(r'.*?', '', text, flags=re.DOTALL | re.IGNORECASE) # Remove extra whitespace text = re.sub(r'\n\n+', '\n\n', text) return text.strip() def _escape_html_in_text(self, text: str) -> str: """Escape HTML tags in text to prevent Markdown from hiding them Wraps code-like content (anything with < >) in backticks. """ import re # Pattern to find HTML-like tags (e.g., , ) # We want to wrap them in backticks so they display correctly def replace_tag(match): tag = match.group(0) # If it's already in backticks or code block, skip return f"`{tag}`" # Find all <...> patterns and wrap them text = re.sub(r'<[^>]+>', replace_tag, text) return text def _get_git_service(self, repository: Repository) -> BaseGitService: """Get appropriate Git service for repository""" from app.utils import decrypt_token from app.config import settings # Parse repository URL to get owner and name # Assuming URL format: https://git.example.com/owner/repo parts = repository.url.rstrip('/').split('/') repo_name = parts[-1].replace('.git', '') repo_owner = parts[-2] base_url = '/'.join(parts[:-2]) # Определяем токен: проектный или мастер if repository.api_token: # Используем проектный токен try: decrypted_token = decrypt_token(repository.api_token) print(f" 🔑 Используется проектный токен") except ValueError as e: raise ValueError(f"Не удалось расшифровать API токен для репозитория {repository.name}: {str(e)}") else: # Используем мастер токен platform = repository.platform.value.lower() if platform == "gitea": decrypted_token = settings.master_gitea_token elif platform == "github": decrypted_token = settings.master_github_token elif platform == "bitbucket": decrypted_token = settings.master_bitbucket_token else: raise ValueError(f"Unsupported platform: {repository.platform}") if not decrypted_token: raise ValueError( f"API токен не указан для репозитория {repository.name} " f"и мастер токен для {platform} не настроен в .env (MASTER_{platform.upper()}_TOKEN)" ) print(f" 🔑 Используется мастер {platform} токен") if repository.platform.value == "gitea": return GiteaService(base_url, decrypted_token, repo_owner, repo_name) elif repository.platform.value == "github": return GitHubService(base_url, decrypted_token, repo_owner, repo_name) elif repository.platform.value == "bitbucket": return BitbucketService(base_url, decrypted_token, repo_owner, repo_name) else: raise ValueError(f"Unsupported platform: {repository.platform}") async def fetch_pr_info(self, state: ReviewState) -> ReviewState: """Fetch PR information""" # Send step event if hasattr(self, '_stream_callback') and self._stream_callback: await self._stream_callback({ "type": "agent_step", "step": "fetch_pr_info", "message": "Получение информации о PR..." }) try: # Update review status result = await self.db.execute( select(Review).where(Review.id == state["review_id"]) ) review = result.scalar_one() review.status = ReviewStatusEnum.FETCHING await self.db.commit() # Get repository result = await self.db.execute( select(Repository).where(Repository.id == state["repository_id"]) ) repository = result.scalar_one() # Initialize Git service git_service = self._get_git_service(repository) state["git_service"] = git_service # Fetch PR info pr_info = await git_service.get_pull_request(state["pr_number"]) print("\n" + "📋"*40) print("ИНФОРМАЦИЯ О PR") print("📋"*40) print(f"\n📝 Название: {pr_info.title}") print(f"👤 Автор: {pr_info.author}") print(f"🔀 Ветки: {pr_info.source_branch} → {pr_info.target_branch}") print(f"📄 Описание:") print("-" * 80) print(pr_info.description if pr_info.description else "(без описания)") print("-" * 80) print("📋"*40 + "\n") # Store PR info in state state["pr_info"] = { "title": pr_info.title, "description": pr_info.description, "author": pr_info.author, "source_branch": pr_info.source_branch, "target_branch": pr_info.target_branch } state["status"] = "pr_info_fetched" return state except Exception as e: print(f"❌ ОШИБКА в fetch_pr_info: {e}") import traceback traceback.print_exc() state["error"] = str(e) state["status"] = "failed" return state async def fetch_files(self, state: ReviewState) -> ReviewState: """Fetch changed files in PR""" # Send step event if hasattr(self, '_stream_callback') and self._stream_callback: await self._stream_callback({ "type": "agent_step", "step": "fetch_files", "message": "Загрузка измененных файлов..." }) try: git_service = state["git_service"] print("\n" + "📥"*40) print("ПОЛУЧЕНИЕ ФАЙЛОВ ИЗ PR") print("📥"*40) # Get changed files files = await git_service.get_pr_files(state["pr_number"]) print(f"\n📊 Получено файлов из API: {len(files)}") for i, f in enumerate(files, 1): print(f"\n {i}. {f.filename}") print(f" Status: {f.status}") print(f" +{f.additions} -{f.deletions}") print(f" Patch: {'ДА' if f.patch else 'НЕТ'} ({len(f.patch) if f.patch else 0} символов)") if f.patch: print(f" Первые 200 символов patch:") print(f" {f.patch[:200]}...") # Filter files that should be reviewed reviewable_files = [] skipped_files = [] for f in files: if should_review_file(f.filename): reviewable_files.append({ "path": f.filename, "status": f.status, "additions": f.additions, "deletions": f.deletions, "patch": f.patch, "language": detect_language(f.filename) }) else: skipped_files.append(f.filename) print(f"\n✅ Файлов для ревью: {len(reviewable_files)}") for rf in reviewable_files: print(f" - {rf['path']} ({rf['language']})") if skipped_files: print(f"\n⏭️ Пропущено файлов: {len(skipped_files)}") for sf in skipped_files: print(f" - {sf}") print("📥"*40 + "\n") state["files"] = reviewable_files state["status"] = "files_fetched" # Update review result = await self.db.execute( select(Review).where(Review.id == state["review_id"]) ) review = result.scalar_one() review.status = ReviewStatusEnum.ANALYZING await self.db.commit() return state except Exception as e: print(f"❌ ОШИБКА в fetch_files: {e}") import traceback traceback.print_exc() state["error"] = str(e) state["status"] = "failed" return state async def analyze_files(self, state: ReviewState) -> ReviewState: """Analyze files and generate comments""" # Send step event if hasattr(self, '_stream_callback') and self._stream_callback: await self._stream_callback({ "type": "agent_step", "step": "analyze_files", "message": "Анализ кода с помощью AI..." }) try: all_comments = [] print("\n" + "🔬"*40) print("НАЧАЛО АНАЛИЗА ФАЙЛОВ") print("🔬"*40) print(f"Файлов для анализа: {len(state['files'])}") for i, file_info in enumerate(state["files"], 1): file_path = file_info["path"] patch = file_info.get("patch") language = file_info.get("language", "text") print(f"\n📂 Файл {i}/{len(state['files'])}: {file_path}") print(f" Язык: {language}") print(f" Размер patch: {len(patch) if patch else 0} символов") print(f" Additions: {file_info.get('additions')}, Deletions: {file_info.get('deletions')}") if not patch or len(patch) < 10: print(f" ⚠️ ПРОПУСК: patch пустой или слишком маленький") continue # Analyze diff with PR context pr_info = state.get("pr_info", {}) comments = await self.analyzer.analyze_diff( file_path=file_path, diff=patch, language=language, pr_title=pr_info.get("title", ""), pr_description=pr_info.get("description", "") ) print(f" 💬 Получено комментариев: {len(comments)}") # Add file path to each comment for comment in comments: comment["file_path"] = file_path all_comments.append(comment) print(f"\n✅ ИТОГО комментариев: {len(all_comments)}") print("🔬"*40 + "\n") state["comments"] = all_comments state["status"] = "analyzed" # Update review result = await self.db.execute( select(Review).where(Review.id == state["review_id"]) ) review = result.scalar_one() review.files_analyzed = len(state["files"]) review.status = ReviewStatusEnum.COMMENTING await self.db.commit() return state except Exception as e: print(f"❌ ОШИБКА в analyze_files: {e}") import traceback traceback.print_exc() state["error"] = str(e) state["status"] = "failed" return state async def post_comments(self, state: ReviewState) -> ReviewState: """Post comments to PR""" # Send step event if hasattr(self, '_stream_callback') and self._stream_callback: await self._stream_callback({ "type": "agent_step", "step": "post_comments", "message": "Публикация комментариев в PR..." }) try: # Save comments to database result = await self.db.execute( select(Review).where(Review.id == state["review_id"]) ) review = result.scalar_one() db_comments = [] for comment_data in state["comments"]: # Фильтруем блоки из сообщения message = comment_data.get("message", "") message = self._remove_think_blocks(message) # Экранируем HTML теги (чтобы они не исчезали в Markdown) message = self._escape_html_in_text(message) comment = Comment( review_id=review.id, file_path=comment_data["file_path"], line_number=comment_data.get("line", 1), content=message, severity=SeverityEnum(comment_data.get("severity", "INFO").lower()), posted=False ) self.db.add(comment) db_comments.append({**comment_data, "message": message}) await self.db.commit() # Post to Git platform git_service = state["git_service"] pr_info = state.get("pr_info", {}) # Generate summary summary = await self.analyzer.generate_summary( all_comments=db_comments, pr_title=pr_info.get("title", ""), pr_description=pr_info.get("description", "") ) # Фильтруем блоки из summary summary = self._remove_think_blocks(summary) # Экранируем HTML теги в summary summary = self._escape_html_in_text(summary) if db_comments: # Format comments for API formatted_comments = [ { "file_path": c["file_path"], "line_number": c.get("line", 1), "content": f"**{c.get('severity', 'INFO').upper()}**: {c.get('message', '')}" } for c in db_comments ] try: # Determine review status based on severity has_errors = any(c.get('severity', '').upper() == 'ERROR' for c in db_comments) event = "REQUEST_CHANGES" if has_errors else "COMMENT" await git_service.create_review( pr_number=state["pr_number"], comments=formatted_comments, body=summary, event=event ) # Mark comments as posted result = await self.db.execute( select(Comment).where(Comment.review_id == review.id) ) comments = result.scalars().all() for comment in comments: comment.posted = True await self.db.commit() except Exception as e: print(f"Error posting comments to Git platform: {e}") # Continue even if posting fails else: # No issues found - approve PR try: await git_service.create_review( pr_number=state["pr_number"], comments=[], body=summary, event="APPROVE" # Approve if no issues ) except Exception as e: print(f"Error posting approval: {e}") review.comments_generated = len(db_comments) await self.db.commit() state["status"] = "commented" return state except Exception as e: state["error"] = str(e) state["status"] = "failed" return state async def complete_review(self, state: ReviewState) -> ReviewState: """Complete the review""" try: result = await self.db.execute( select(Review).where(Review.id == state["review_id"]) ) review = result.scalar_one() if state.get("error"): review.status = ReviewStatusEnum.FAILED review.error_message = state["error"] else: review.status = ReviewStatusEnum.COMPLETED from datetime import datetime review.completed_at = datetime.utcnow() await self.db.commit() state["status"] = "completed" return state except Exception as e: state["error"] = str(e) state["status"] = "failed" return state async def run_review( self, review_id: int, pr_number: int, repository_id: int ) -> Dict[str, Any]: """Run the review workflow""" initial_state: ReviewState = { "review_id": review_id, "pr_number": pr_number, "repository_id": repository_id, "status": "pending", "files": [], "analyzed_files": [], "comments": [], "error": None, "git_service": None } final_state = await self.graph.ainvoke(initial_state) return final_state async def run_review_stream( self, review_id: int, pr_number: int, repository_id: int, on_event: callable = None ) -> Dict[str, Any]: """Run the review workflow with streaming events""" print(f"🎬 Starting review stream for PR #{pr_number}") # Store callback in instance for access in nodes self._stream_callback = on_event initial_state: ReviewState = { "review_id": review_id, "pr_number": pr_number, "repository_id": repository_id, "status": "pending", "files": [], "analyzed_files": [], "comments": [], "error": None, "git_service": None } final_state = None event_count = 0 # Stream through the graph print(f"📊 Starting graph stream with mode=['updates']") try: async for event in self.graph.astream( initial_state, stream_mode=["updates"] ): event_count += 1 print(f"📨 Event #{event_count} received from graph: {type(event)}") print(f" Event content: {event if not isinstance(event, dict) or len(str(event)) < 200 else 'dict with keys: ' + str(list(event.keys()))}") # Handle different event types if isinstance(event, dict): # Node updates for node_name, node_data in event.items(): print(f" 🔔 Node update: {node_name}") if on_event: print(f" 📤 Sending event to callback for node: {node_name}") await on_event({ "type": "agent_step", "step": node_name, "data": node_data if not isinstance(node_data, dict) else {"status": "processing"} }) # Store final state if isinstance(node_data, dict): final_state = node_data else: print(f" ⚠️ Unexpected event type: {type(event)}") except Exception as e: print(f"❌ Error in graph streaming: {e}") import traceback traceback.print_exc() print(f"✅ Graph streaming completed. Total events: {event_count}") # Clear callback self._stream_callback = None return final_state or initial_state