320 lines
11 KiB
Python
320 lines
11 KiB
Python
"""Tools for the reviewer agent"""
|
||
|
||
import json
|
||
import re
|
||
from typing import List, Dict, Any, Optional
|
||
from langchain_ollama import OllamaLLM
|
||
from langchain_core.output_parsers import JsonOutputParser
|
||
from langchain_core.prompts import PromptTemplate
|
||
from app.agents.prompts import DIFF_REVIEW_PROMPT, CODE_REVIEW_PROMPT
|
||
|
||
|
||
class CodeAnalyzer:
|
||
"""Tool for analyzing code with Ollama"""
|
||
|
||
def __init__(self, ollama_base_url: str, model: str):
|
||
self.llm = OllamaLLM(
|
||
base_url=ollama_base_url,
|
||
model=model,
|
||
temperature=0.3, # Увеличили для более внимательного анализа
|
||
format="json" # Форсируем JSON формат
|
||
)
|
||
# Используем JsonOutputParser для гарантированного JSON
|
||
self.json_parser = JsonOutputParser()
|
||
|
||
def _extract_json_from_response(self, response: str) -> Dict[str, Any]:
|
||
"""Extract JSON from LLM response"""
|
||
# Remove markdown code blocks if present
|
||
response = response.strip()
|
||
if response.startswith('```'):
|
||
response = re.sub(r'^```(?:json)?\s*', '', response)
|
||
response = re.sub(r'\s*```$', '', response)
|
||
|
||
# Try to find JSON in the response
|
||
json_match = re.search(r'\{[\s\S]*\}', response)
|
||
if json_match:
|
||
try:
|
||
json_str = json_match.group()
|
||
print(f" 🔍 Найден JSON: {json_str[:200]}...")
|
||
return json.loads(json_str)
|
||
except json.JSONDecodeError as e:
|
||
print(f" ❌ Ошибка парсинга JSON: {e}")
|
||
print(f" 📄 JSON строка: {json_str[:500]}")
|
||
else:
|
||
print(f" ❌ JSON не найден в ответе!")
|
||
print(f" 📄 Ответ: {response[:500]}")
|
||
|
||
# If no valid JSON found, return empty comments
|
||
return {"comments": []}
|
||
|
||
async def generate_summary(
|
||
self,
|
||
all_comments: List[Dict[str, Any]],
|
||
pr_title: str = "",
|
||
pr_description: str = ""
|
||
) -> str:
|
||
"""Generate overall review summary in markdown"""
|
||
if not all_comments:
|
||
return """## 🤖 AI Code Review
|
||
|
||
✅ **Отличная работа!** Серьезных проблем не обнаружено.
|
||
|
||
Код выглядит хорошо и соответствует стандартам."""
|
||
|
||
# Группируем по severity
|
||
errors = [c for c in all_comments if c.get('severity', '').upper() == 'ERROR']
|
||
warnings = [c for c in all_comments if c.get('severity', '').upper() == 'WARNING']
|
||
infos = [c for c in all_comments if c.get('severity', '').upper() == 'INFO']
|
||
|
||
summary = f"""## 🤖 AI Code Review
|
||
|
||
### 📊 Статистика
|
||
|
||
- **Всего проблем:** {len(all_comments)}
|
||
"""
|
||
|
||
if errors:
|
||
summary += f"- ❌ **Критичных:** {len(errors)}\n"
|
||
if warnings:
|
||
summary += f"- ⚠️ **Важных:** {len(warnings)}\n"
|
||
if infos:
|
||
summary += f"- ℹ️ **Рекомендаций:** {len(infos)}\n"
|
||
|
||
summary += "\n### 💡 Рекомендации\n\n"
|
||
|
||
if errors:
|
||
summary += "⚠️ **Найдены критичные проблемы!** Пожалуйста, исправьте их перед мержем в main.\n\n"
|
||
elif warnings:
|
||
summary += "Найдены важные замечания. Рекомендуется исправить перед мержем.\n\n"
|
||
else:
|
||
summary += "Проблемы не критичны, но рекомендуется учесть.\n\n"
|
||
|
||
summary += "📝 **Детальные комментарии для каждой проблемы опубликованы ниже.**\n"
|
||
|
||
return summary
|
||
|
||
async def analyze_diff(
|
||
self,
|
||
file_path: str,
|
||
diff: str,
|
||
language: Optional[str] = None,
|
||
pr_title: str = "",
|
||
pr_description: str = "",
|
||
on_llm_chunk: Optional[callable] = None
|
||
) -> List[Dict[str, Any]]:
|
||
"""Analyze code diff and return comments"""
|
||
|
||
if not diff or not diff.strip():
|
||
print(f"⚠️ Пустой diff для {file_path}")
|
||
return []
|
||
|
||
# Add PR context if available
|
||
pr_context = ""
|
||
if pr_title or pr_description:
|
||
pr_context = f"\n\n**КОНТЕКСТ PR:**\n"
|
||
if pr_title:
|
||
pr_context += f"Название: {pr_title}\n"
|
||
if pr_description:
|
||
pr_context += f"Описание: {pr_description}\n"
|
||
pr_context += "\nОБЯЗАТЕЛЬНО проверь: соответствует ли код описанию PR!\n"
|
||
|
||
# Получаем инструкции по формату JSON от парсера
|
||
format_instructions = self.json_parser.get_format_instructions()
|
||
|
||
prompt = DIFF_REVIEW_PROMPT.format(
|
||
file_path=file_path,
|
||
diff=diff,
|
||
pr_context=pr_context,
|
||
format_instructions=format_instructions
|
||
)
|
||
|
||
print("\n" + "="*80)
|
||
print(f"🔍 АНАЛИЗ ФАЙЛА: {file_path}")
|
||
print("="*80)
|
||
|
||
if pr_title or pr_description:
|
||
print(f"\n📋 КОНТЕКСТ PR:")
|
||
print("-" * 80)
|
||
if pr_title:
|
||
print(f"Название: {pr_title}")
|
||
if pr_description:
|
||
desc_short = pr_description[:200] + ("..." if len(pr_description) > 200 else "")
|
||
print(f"Описание: {desc_short}")
|
||
print("-" * 80)
|
||
|
||
print(f"\n📝 DIFF ({len(diff)} символов):")
|
||
print("-" * 80)
|
||
# Показываем первые 800 символов diff
|
||
print(diff[:800] + ("...\n[обрезано]" if len(diff) > 800 else ""))
|
||
print("-" * 80)
|
||
print(f"\n💭 ПРОМПТ ({len(prompt)} символов):")
|
||
print("-" * 80)
|
||
print(prompt[:500] + "...")
|
||
print("-" * 80)
|
||
|
||
try:
|
||
print(f"\n⏳ Отправка запроса к Ollama ({self.llm.model})...")
|
||
|
||
# Собираем полный ответ из streaming chunks
|
||
full_response = ""
|
||
chunk_count = 0
|
||
|
||
print(f"\n🤖 STREAMING AI ответ:")
|
||
print("-" * 80)
|
||
|
||
# Используем streaming
|
||
async for chunk in self.llm.astream(prompt):
|
||
chunk_count += 1
|
||
full_response += chunk
|
||
|
||
# Отправляем chunk через callback
|
||
if on_llm_chunk:
|
||
await on_llm_chunk(chunk, file_path)
|
||
|
||
# Показываем в консоли
|
||
print(chunk, end='', flush=True)
|
||
|
||
print("\n" + "-" * 80)
|
||
print(f"✅ Получено {chunk_count} chunks, всего {len(full_response)} символов")
|
||
|
||
# Парсим финальный результат
|
||
result = self.json_parser.parse(full_response)
|
||
|
||
print(f"\n🤖 РАСПАРСЕННЫЙ результат:")
|
||
print("-" * 80)
|
||
print(json.dumps(result, ensure_ascii=False, indent=2)[:500] + "...")
|
||
print("-" * 80)
|
||
|
||
comments = result.get("comments", [])
|
||
|
||
if comments:
|
||
print(f"\n✅ Найдено комментариев: {len(comments)}")
|
||
for i, comment in enumerate(comments, 1):
|
||
print(f"\n {i}. Строка {comment.get('line', '?')}:")
|
||
print(f" Severity: {comment.get('severity', '?')}")
|
||
print(f" Message: {comment.get('message', '?')[:100]}...")
|
||
else:
|
||
print("\n⚠️ Комментариев не найдено! AI не нашел проблем.")
|
||
|
||
print("="*80 + "\n")
|
||
|
||
return comments
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ ОШИБКА при анализе {file_path}: {e}")
|
||
print(f" Тип ошибки: {type(e).__name__}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# Fallback: попытка извлечь JSON вручную
|
||
print("\n🔄 Попытка fallback парсинга...")
|
||
try:
|
||
if hasattr(e, 'args') and len(e.args) > 0:
|
||
response_text = str(e.args[0])
|
||
result = self._extract_json_from_response(response_text)
|
||
return result.get("comments", [])
|
||
except:
|
||
pass
|
||
|
||
return []
|
||
|
||
async def analyze_code(
|
||
self,
|
||
file_path: str,
|
||
code: str,
|
||
language: str = "python",
|
||
patch_info: str = ""
|
||
) -> List[Dict[str, Any]]:
|
||
"""Analyze full code content and return comments"""
|
||
|
||
if not code or not code.strip():
|
||
return []
|
||
|
||
prompt = CODE_REVIEW_PROMPT.format(
|
||
file_path=file_path,
|
||
code=code,
|
||
language=language,
|
||
patch_info=patch_info
|
||
)
|
||
|
||
try:
|
||
response = await self.llm.ainvoke(prompt)
|
||
result = self._extract_json_from_response(response)
|
||
return result.get("comments", [])
|
||
except Exception as e:
|
||
print(f"Error analyzing code for {file_path}: {e}")
|
||
return []
|
||
|
||
|
||
def detect_language(file_path: str) -> str:
|
||
"""Detect programming language from file extension"""
|
||
extension_map = {
|
||
'.py': 'python',
|
||
'.js': 'javascript',
|
||
'.ts': 'typescript',
|
||
'.tsx': 'typescript',
|
||
'.jsx': 'javascript',
|
||
'.java': 'java',
|
||
'.go': 'go',
|
||
'.rs': 'rust',
|
||
'.cpp': 'cpp',
|
||
'.c': 'c',
|
||
'.cs': 'csharp',
|
||
'.php': 'php',
|
||
'.rb': 'ruby',
|
||
'.swift': 'swift',
|
||
'.kt': 'kotlin',
|
||
'.scala': 'scala',
|
||
'.sh': 'bash',
|
||
'.sql': 'sql',
|
||
'.html': 'html',
|
||
'.css': 'css',
|
||
'.scss': 'scss',
|
||
'.yaml': 'yaml',
|
||
'.yml': 'yaml',
|
||
'.json': 'json',
|
||
'.xml': 'xml',
|
||
'.md': 'markdown',
|
||
}
|
||
|
||
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
|
||
return extension_map.get(ext.lower(), 'text')
|
||
|
||
|
||
def should_review_file(file_path: str) -> bool:
|
||
"""Determine if file should be reviewed"""
|
||
# Skip binary, generated, and config files
|
||
skip_extensions = {
|
||
'.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico',
|
||
'.pdf', '.zip', '.tar', '.gz',
|
||
'.lock', '.min.js', '.min.css',
|
||
'.pyc', '.pyo', '.class', '.o',
|
||
}
|
||
|
||
skip_patterns = [
|
||
'node_modules/',
|
||
'venv/',
|
||
'.git/',
|
||
'dist/',
|
||
'build/',
|
||
'__pycache__/',
|
||
'.next/',
|
||
'.nuxt/',
|
||
'package-lock.json',
|
||
'yarn.lock',
|
||
'poetry.lock',
|
||
]
|
||
|
||
# Check extension
|
||
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
|
||
if ext.lower() in skip_extensions:
|
||
return False
|
||
|
||
# Check patterns
|
||
for pattern in skip_patterns:
|
||
if pattern in file_path:
|
||
return False
|
||
|
||
return True
|
||
|