"""Tools for the reviewer agent""" import json import re from typing import List, Dict, Any, Optional from langchain_ollama import OllamaLLM from langchain_core.output_parsers import JsonOutputParser from langchain_core.prompts import PromptTemplate from app.agents.prompts import DIFF_REVIEW_PROMPT, CODE_REVIEW_PROMPT class CodeAnalyzer: """Tool for analyzing code with Ollama""" def __init__(self, ollama_base_url: str, model: str): self.llm = OllamaLLM( base_url=ollama_base_url, model=model, temperature=0.3, # Увеличили для более внимательного анализа format="json" # Форсируем JSON формат ) # Используем JsonOutputParser для гарантированного JSON self.json_parser = JsonOutputParser() def _extract_json_from_response(self, response: str) -> Dict[str, Any]: """Extract JSON from LLM response""" # Remove markdown code blocks if present response = response.strip() if response.startswith('```'): response = re.sub(r'^```(?:json)?\s*', '', response) response = re.sub(r'\s*```$', '', response) # Try to find JSON in the response json_match = re.search(r'\{[\s\S]*\}', response) if json_match: try: json_str = json_match.group() print(f" 🔍 Найден JSON: {json_str[:200]}...") return json.loads(json_str) except json.JSONDecodeError as e: print(f" ❌ Ошибка парсинга JSON: {e}") print(f" 📄 JSON строка: {json_str[:500]}") else: print(f" ❌ JSON не найден в ответе!") print(f" 📄 Ответ: {response[:500]}") # If no valid JSON found, return empty comments return {"comments": []} async def generate_summary( self, all_comments: List[Dict[str, Any]], pr_title: str = "", pr_description: str = "" ) -> str: """Generate overall review summary in markdown""" if not all_comments: return """## 🤖 AI Code Review ✅ **Отличная работа!** Серьезных проблем не обнаружено. Код выглядит хорошо и соответствует стандартам.""" # Группируем по severity errors = [c for c in all_comments if c.get('severity', '').upper() == 'ERROR'] warnings = [c for c in all_comments if c.get('severity', '').upper() == 'WARNING'] infos = [c for c in all_comments if c.get('severity', '').upper() == 'INFO'] summary = f"""## 🤖 AI Code Review ### 📊 Статистика - **Всего проблем:** {len(all_comments)} """ if errors: summary += f"- ❌ **Критичных:** {len(errors)}\n" if warnings: summary += f"- ⚠️ **Важных:** {len(warnings)}\n" if infos: summary += f"- ℹ️ **Рекомендаций:** {len(infos)}\n" summary += "\n### 💡 Рекомендации\n\n" if errors: summary += "⚠️ **Найдены критичные проблемы!** Пожалуйста, исправьте их перед мержем в main.\n\n" elif warnings: summary += "Найдены важные замечания. Рекомендуется исправить перед мержем.\n\n" else: summary += "Проблемы не критичны, но рекомендуется учесть.\n\n" summary += "📝 **Детальные комментарии для каждой проблемы опубликованы ниже.**\n" return summary async def analyze_diff( self, file_path: str, diff: str, language: Optional[str] = None, pr_title: str = "", pr_description: str = "", on_llm_chunk: Optional[callable] = None ) -> List[Dict[str, Any]]: """Analyze code diff and return comments""" if not diff or not diff.strip(): print(f"⚠️ Пустой diff для {file_path}") return [] # Add PR context if available pr_context = "" if pr_title or pr_description: pr_context = f"\n\n**КОНТЕКСТ PR:**\n" if pr_title: pr_context += f"Название: {pr_title}\n" if pr_description: pr_context += f"Описание: {pr_description}\n" pr_context += "\nОБЯЗАТЕЛЬНО проверь: соответствует ли код описанию PR!\n" # Получаем инструкции по формату JSON от парсера format_instructions = self.json_parser.get_format_instructions() prompt = DIFF_REVIEW_PROMPT.format( file_path=file_path, diff=diff, pr_context=pr_context, format_instructions=format_instructions ) print("\n" + "="*80) print(f"🔍 АНАЛИЗ ФАЙЛА: {file_path}") print("="*80) if pr_title or pr_description: print(f"\n📋 КОНТЕКСТ PR:") print("-" * 80) if pr_title: print(f"Название: {pr_title}") if pr_description: desc_short = pr_description[:200] + ("..." if len(pr_description) > 200 else "") print(f"Описание: {desc_short}") print("-" * 80) print(f"\n📝 DIFF ({len(diff)} символов):") print("-" * 80) # Показываем первые 800 символов diff print(diff[:800] + ("...\n[обрезано]" if len(diff) > 800 else "")) print("-" * 80) print(f"\n💭 ПРОМПТ ({len(prompt)} символов):") print("-" * 80) print(prompt[:500] + "...") print("-" * 80) try: print(f"\n⏳ Отправка запроса к Ollama ({self.llm.model})...") # Собираем полный ответ из streaming chunks full_response = "" chunk_count = 0 print(f"\n🤖 STREAMING AI ответ:") print("-" * 80) # Используем streaming async for chunk in self.llm.astream(prompt): chunk_count += 1 full_response += chunk # Отправляем chunk через callback if on_llm_chunk: await on_llm_chunk(chunk, file_path) # Показываем в консоли print(chunk, end='', flush=True) print("\n" + "-" * 80) print(f"✅ Получено {chunk_count} chunks, всего {len(full_response)} символов") # Парсим финальный результат result = self.json_parser.parse(full_response) print(f"\n🤖 РАСПАРСЕННЫЙ результат:") print("-" * 80) print(json.dumps(result, ensure_ascii=False, indent=2)[:500] + "...") print("-" * 80) comments = result.get("comments", []) if comments: print(f"\n✅ Найдено комментариев: {len(comments)}") for i, comment in enumerate(comments, 1): print(f"\n {i}. Строка {comment.get('line', '?')}:") print(f" Severity: {comment.get('severity', '?')}") print(f" Message: {comment.get('message', '?')[:100]}...") else: print("\n⚠️ Комментариев не найдено! AI не нашел проблем.") print("="*80 + "\n") return comments except Exception as e: print(f"\n❌ ОШИБКА при анализе {file_path}: {e}") print(f" Тип ошибки: {type(e).__name__}") import traceback traceback.print_exc() # Fallback: попытка извлечь JSON вручную print("\n🔄 Попытка fallback парсинга...") try: if hasattr(e, 'args') and len(e.args) > 0: response_text = str(e.args[0]) result = self._extract_json_from_response(response_text) return result.get("comments", []) except: pass return [] async def analyze_code( self, file_path: str, code: str, language: str = "python", patch_info: str = "" ) -> List[Dict[str, Any]]: """Analyze full code content and return comments""" if not code or not code.strip(): return [] prompt = CODE_REVIEW_PROMPT.format( file_path=file_path, code=code, language=language, patch_info=patch_info ) try: response = await self.llm.ainvoke(prompt) result = self._extract_json_from_response(response) return result.get("comments", []) except Exception as e: print(f"Error analyzing code for {file_path}: {e}") return [] def detect_language(file_path: str) -> str: """Detect programming language from file extension""" extension_map = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.tsx': 'typescript', '.jsx': 'javascript', '.java': 'java', '.go': 'go', '.rs': 'rust', '.cpp': 'cpp', '.c': 'c', '.cs': 'csharp', '.php': 'php', '.rb': 'ruby', '.swift': 'swift', '.kt': 'kotlin', '.scala': 'scala', '.sh': 'bash', '.sql': 'sql', '.html': 'html', '.css': 'css', '.scss': 'scss', '.yaml': 'yaml', '.yml': 'yaml', '.json': 'json', '.xml': 'xml', '.md': 'markdown', } ext = '.' + file_path.split('.')[-1] if '.' in file_path else '' return extension_map.get(ext.lower(), 'text') def should_review_file(file_path: str) -> bool: """Determine if file should be reviewed""" # Skip binary, generated, and config files skip_extensions = { '.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico', '.pdf', '.zip', '.tar', '.gz', '.lock', '.min.js', '.min.css', '.pyc', '.pyo', '.class', '.o', } skip_patterns = [ 'node_modules/', 'venv/', '.git/', 'dist/', 'build/', '__pycache__/', '.next/', '.nuxt/', 'package-lock.json', 'yarn.lock', 'poetry.lock', ] # Check extension ext = '.' + file_path.split('.')[-1] if '.' in file_path else '' if ext.lower() in skip_extensions: return False # Check patterns for pattern in skip_patterns: if pattern in file_path: return False return True