diff --git a/backend/app/workers/task_worker.py b/backend/app/workers/task_worker.py index d0c1d1e..351909f 100644 --- a/backend/app/workers/task_worker.py +++ b/backend/app/workers/task_worker.py @@ -9,6 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal from app.models import ReviewTask, PullRequest, Repository, Review from app.models.review_task import TaskStatusEnum +from app.models.review import ReviewStatusEnum from app.agents.reviewer import ReviewerAgent from app.config import settings @@ -28,6 +29,9 @@ class ReviewTaskWorker: self.running = True logger.info("🚀 Task Worker запущен") + # Очищаем зависшие задачи при старте + await self._cleanup_stuck_tasks() + while self.running: try: await self._process_next_task() @@ -44,6 +48,61 @@ class ReviewTaskWorker: self.running = False logger.info("⏹️ Task Worker остановлен") + async def _cleanup_stuck_tasks(self): + """Cleanup tasks that were IN_PROGRESS when server stopped""" + async with AsyncSessionLocal() as db: + try: + # Находим все задачи в статусе IN_PROGRESS + stuck_query = select(ReviewTask).where( + ReviewTask.status == TaskStatusEnum.IN_PROGRESS + ) + result = await db.execute(stuck_query) + stuck_tasks = result.scalars().all() + + if stuck_tasks: + logger.info(f"🔧 Найдено {len(stuck_tasks)} зависших задач, возвращаем в очередь...") + for task in stuck_tasks: + logger.info(f" ↩️ Задача #{task.id} (PR #{task.pull_request_id}) → PENDING") + task.status = TaskStatusEnum.PENDING + task.started_at = None + # Не увеличиваем retry_count, это не была ошибка + + await db.commit() + logger.info(f"✅ Зависшие задачи очищены и возвращены в очередь") + else: + logger.info("✅ Зависших задач не найдено") + + # Также очищаем зависшие reviews (которые были в процессе работы) + stuck_review_statuses = [ + ReviewStatusEnum.FETCHING, + ReviewStatusEnum.ANALYZING, + ReviewStatusEnum.COMMENTING + ] + stuck_reviews_query = select(Review).where( + Review.status.in_(stuck_review_statuses) + ) + result = await db.execute(stuck_reviews_query) + stuck_reviews = result.scalars().all() + + if stuck_reviews: + logger.info(f"🔧 Найдено {len(stuck_reviews)} зависших reviews, помечаем как failed...") + for review in stuck_reviews: + logger.info(f" ⚠️ Review #{review.id} (статус: {review.status}) → FAILED") + review.status = ReviewStatusEnum.FAILED + review.error_message = "Review прерван при перезапуске сервера" + from datetime import datetime + review.completed_at = datetime.utcnow() + + await db.commit() + logger.info(f"✅ Зависшие reviews помечены как failed") + else: + logger.info("✅ Зависших reviews не найдено") + + except Exception as e: + logger.error(f"❌ Ошибка при очистке зависших задач: {e}") + import traceback + traceback.print_exc() + async def _process_next_task(self): """Process next pending task""" async with AsyncSessionLocal() as db: