Primakov Alexandr Alexandrovich 09cdd06307 init
2025-10-12 23:15:09 +03:00

300 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tools for the reviewer agent"""
import json
import re
from typing import List, Dict, Any, Optional
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from app.agents.prompts import DIFF_REVIEW_PROMPT, CODE_REVIEW_PROMPT
class CodeAnalyzer:
"""Tool for analyzing code with Ollama"""
def __init__(self, ollama_base_url: str, model: str):
self.llm = OllamaLLM(
base_url=ollama_base_url,
model=model,
temperature=0.3, # Увеличили для более внимательного анализа
format="json" # Форсируем JSON формат
)
# Используем JsonOutputParser для гарантированного JSON
self.json_parser = JsonOutputParser()
def _extract_json_from_response(self, response: str) -> Dict[str, Any]:
"""Extract JSON from LLM response"""
# Remove markdown code blocks if present
response = response.strip()
if response.startswith('```'):
response = re.sub(r'^```(?:json)?\s*', '', response)
response = re.sub(r'\s*```$', '', response)
# Try to find JSON in the response
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
try:
json_str = json_match.group()
print(f" 🔍 Найден JSON: {json_str[:200]}...")
return json.loads(json_str)
except json.JSONDecodeError as e:
print(f" ❌ Ошибка парсинга JSON: {e}")
print(f" 📄 JSON строка: {json_str[:500]}")
else:
print(f" ❌ JSON не найден в ответе!")
print(f" 📄 Ответ: {response[:500]}")
# If no valid JSON found, return empty comments
return {"comments": []}
async def generate_summary(
self,
all_comments: List[Dict[str, Any]],
pr_title: str = "",
pr_description: str = ""
) -> str:
"""Generate overall review summary in markdown"""
if not all_comments:
return """## 🤖 AI Code Review
✅ **Отличная работа!** Серьезных проблем не обнаружено.
Код выглядит хорошо и соответствует стандартам."""
# Группируем по severity
errors = [c for c in all_comments if c.get('severity', '').upper() == 'ERROR']
warnings = [c for c in all_comments if c.get('severity', '').upper() == 'WARNING']
infos = [c for c in all_comments if c.get('severity', '').upper() == 'INFO']
summary = f"""## 🤖 AI Code Review
### 📊 Статистика
- **Всего проблем:** {len(all_comments)}
"""
if errors:
summary += f"- ❌ **Критичных:** {len(errors)}\n"
if warnings:
summary += f"- ⚠️ **Важных:** {len(warnings)}\n"
if infos:
summary += f"- **Рекомендаций:** {len(infos)}\n"
summary += "\n### 💡 Рекомендации\n\n"
if errors:
summary += "⚠️ **Найдены критичные проблемы!** Пожалуйста, исправьте их перед мержем в main.\n\n"
elif warnings:
summary += "Найдены важные замечания. Рекомендуется исправить перед мержем.\n\n"
else:
summary += "Проблемы не критичны, но рекомендуется учесть.\n\n"
summary += "📝 **Детальные комментарии для каждой проблемы опубликованы ниже.**\n"
return summary
async def analyze_diff(
self,
file_path: str,
diff: str,
language: Optional[str] = None,
pr_title: str = "",
pr_description: str = ""
) -> List[Dict[str, Any]]:
"""Analyze code diff and return comments"""
if not diff or not diff.strip():
print(f"⚠️ Пустой diff для {file_path}")
return []
# Add PR context if available
pr_context = ""
if pr_title or pr_description:
pr_context = f"\n\n**КОНТЕКСТ PR:**\n"
if pr_title:
pr_context += f"Название: {pr_title}\n"
if pr_description:
pr_context += f"Описание: {pr_description}\n"
pr_context += "\nОБЯЗАТЕЛЬНО проверь: соответствует ли код описанию PR!\n"
# Получаем инструкции по формату JSON от парсера
format_instructions = self.json_parser.get_format_instructions()
prompt = DIFF_REVIEW_PROMPT.format(
file_path=file_path,
diff=diff,
pr_context=pr_context,
format_instructions=format_instructions
)
print("\n" + "="*80)
print(f"🔍 АНАЛИЗ ФАЙЛА: {file_path}")
print("="*80)
if pr_title or pr_description:
print(f"\n📋 КОНТЕКСТ PR:")
print("-" * 80)
if pr_title:
print(f"Название: {pr_title}")
if pr_description:
desc_short = pr_description[:200] + ("..." if len(pr_description) > 200 else "")
print(f"Описание: {desc_short}")
print("-" * 80)
print(f"\n📝 DIFF ({len(diff)} символов):")
print("-" * 80)
# Показываем первые 800 символов diff
print(diff[:800] + ("...\n[обрезано]" if len(diff) > 800 else ""))
print("-" * 80)
print(f"\n💭 ПРОМПТ ({len(prompt)} символов):")
print("-" * 80)
print(prompt[:500] + "...")
print("-" * 80)
try:
print(f"\n⏳ Отправка запроса к Ollama ({self.llm.model})...")
# Создаем chain с LLM и JSON парсером
chain = self.llm | self.json_parser
# Получаем результат
result = await chain.ainvoke(prompt)
print(f"\n🤖 ОТВЕТ AI (распарсен через JsonOutputParser):")
print("-" * 80)
print(json.dumps(result, ensure_ascii=False, indent=2)[:500] + "...")
print("-" * 80)
comments = result.get("comments", [])
if comments:
print(f"\n✅ Найдено комментариев: {len(comments)}")
for i, comment in enumerate(comments, 1):
print(f"\n {i}. Строка {comment.get('line', '?')}:")
print(f" Severity: {comment.get('severity', '?')}")
print(f" Message: {comment.get('message', '?')[:100]}...")
else:
print("\n⚠️ Комментариев не найдено! AI не нашел проблем.")
print("="*80 + "\n")
return comments
except Exception as e:
print(f"\n❌ ОШИБКА при анализе {file_path}: {e}")
print(f" Тип ошибки: {type(e).__name__}")
import traceback
traceback.print_exc()
# Fallback: попытка извлечь JSON вручную
print("\n🔄 Попытка fallback парсинга...")
try:
if hasattr(e, 'args') and len(e.args) > 0:
response_text = str(e.args[0])
result = self._extract_json_from_response(response_text)
return result.get("comments", [])
except:
pass
return []
async def analyze_code(
self,
file_path: str,
code: str,
language: str = "python",
patch_info: str = ""
) -> List[Dict[str, Any]]:
"""Analyze full code content and return comments"""
if not code or not code.strip():
return []
prompt = CODE_REVIEW_PROMPT.format(
file_path=file_path,
code=code,
language=language,
patch_info=patch_info
)
try:
response = await self.llm.ainvoke(prompt)
result = self._extract_json_from_response(response)
return result.get("comments", [])
except Exception as e:
print(f"Error analyzing code for {file_path}: {e}")
return []
def detect_language(file_path: str) -> str:
"""Detect programming language from file extension"""
extension_map = {
'.py': 'python',
'.js': 'javascript',
'.ts': 'typescript',
'.tsx': 'typescript',
'.jsx': 'javascript',
'.java': 'java',
'.go': 'go',
'.rs': 'rust',
'.cpp': 'cpp',
'.c': 'c',
'.cs': 'csharp',
'.php': 'php',
'.rb': 'ruby',
'.swift': 'swift',
'.kt': 'kotlin',
'.scala': 'scala',
'.sh': 'bash',
'.sql': 'sql',
'.html': 'html',
'.css': 'css',
'.scss': 'scss',
'.yaml': 'yaml',
'.yml': 'yaml',
'.json': 'json',
'.xml': 'xml',
'.md': 'markdown',
}
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
return extension_map.get(ext.lower(), 'text')
def should_review_file(file_path: str) -> bool:
"""Determine if file should be reviewed"""
# Skip binary, generated, and config files
skip_extensions = {
'.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico',
'.pdf', '.zip', '.tar', '.gz',
'.lock', '.min.js', '.min.css',
'.pyc', '.pyo', '.class', '.o',
}
skip_patterns = [
'node_modules/',
'venv/',
'.git/',
'dist/',
'build/',
'__pycache__/',
'.next/',
'.nuxt/',
'package-lock.json',
'yarn.lock',
'poetry.lock',
]
# Check extension
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
if ext.lower() in skip_extensions:
return False
# Check patterns
for pattern in skip_patterns:
if pattern in file_path:
return False
return True