"""Task Worker for sequential review processing""" import asyncio import logging from datetime import datetime from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal from app.models import ReviewTask, PullRequest, Repository, Review from app.models.review_task import TaskStatusEnum from app.agents.reviewer import ReviewerAgent from app.config import settings logger = logging.getLogger(__name__) class ReviewTaskWorker: """Worker that processes review tasks sequentially""" def __init__(self): self.running = False self.current_task_id = None self.poll_interval = 10 # секунд между проверками async def start(self): """Start the worker""" self.running = True logger.info("🚀 Task Worker запущен") while self.running: try: await self._process_next_task() except Exception as e: logger.error(f"❌ Ошибка в Task Worker: {e}") import traceback traceback.print_exc() # Подождать перед следующей проверкой await asyncio.sleep(self.poll_interval) async def stop(self): """Stop the worker""" self.running = False logger.info("⏹️ Task Worker остановлен") async def _process_next_task(self): """Process next pending task""" async with AsyncSessionLocal() as db: # Проверяем есть ли уже выполняющаяся задача in_progress_query = select(ReviewTask).where( ReviewTask.status == TaskStatusEnum.IN_PROGRESS ) result = await db.execute(in_progress_query) in_progress = result.scalar_one_or_none() if in_progress: # Уже есть задача в работе, ждем logger.debug(f"⏳ Задача #{in_progress.id} уже выполняется") return # Берем следующую pending задачу (с приоритетом) pending_query = select(ReviewTask).where( ReviewTask.status == TaskStatusEnum.PENDING ).order_by( ReviewTask.priority.desc(), # HIGH > NORMAL > LOW ReviewTask.created_at.asc() # Старые первыми ).limit(1) result = await db.execute(pending_query) task = result.scalar_one_or_none() if not task: # Нет задач в очереди return logger.info(f"\n{'='*80}") logger.info(f"📋 Начало обработки задачи #{task.id}") logger.info(f" PR ID: {task.pull_request_id}") logger.info(f" Приоритет: {task.priority}") logger.info(f"={'='*80}\n") # Отмечаем задачу как in_progress task.status = TaskStatusEnum.IN_PROGRESS task.started_at = datetime.utcnow() self.current_task_id = task.id await db.commit() try: # Выполняем review await self._execute_review(task, db) # Успешно завершено task.status = TaskStatusEnum.COMPLETED task.completed_at = datetime.utcnow() logger.info(f"✅ Задача #{task.id} успешно завершена") except Exception as e: # Ошибка при выполнении task.retry_count += 1 task.error_message = str(e) if task.retry_count >= task.max_retries: # Превышено количество попыток task.status = TaskStatusEnum.FAILED task.completed_at = datetime.utcnow() logger.error(f"❌ Задача #{task.id} провалена после {task.retry_count} попыток: {e}") else: # Вернуть в pending для повторной попытки task.status = TaskStatusEnum.PENDING logger.warning(f"⚠️ Задача #{task.id} вернулась в очередь (попытка {task.retry_count}/{task.max_retries}): {e}") import traceback traceback.print_exc() finally: self.current_task_id = None await db.commit() async def _execute_review(self, task: ReviewTask, db: AsyncSession): """Execute review for the task""" # Get PR with repository result = await db.execute( select(PullRequest).where(PullRequest.id == task.pull_request_id) ) pull_request = result.scalar_one_or_none() if not pull_request: raise ValueError(f"PullRequest {task.pull_request_id} not found") # Get repository result = await db.execute( select(Repository).where(Repository.id == pull_request.repository_id) ) repository = result.scalar_one_or_none() if not repository: raise ValueError(f"Repository {pull_request.repository_id} not found") # Check if review already exists and is not failed existing_review = await db.execute( select(Review).where( Review.pull_request_id == pull_request.id ).order_by(Review.started_at.desc()) ) review = existing_review.scalar_one_or_none() if review and review.status not in ["failed", "pending"]: logger.info(f" Review already exists with status: {review.status}") return # Create new review if doesn't exist if not review: review = Review( pull_request_id=pull_request.id, status="pending" ) db.add(review) await db.commit() await db.refresh(review) # Run review agent with streaming logger.info(f" 🤖 Запуск AI review для PR #{pull_request.pr_number}") # Import broadcast function from app.main import manager from datetime import datetime as dt # Send initial "review started" message logger.info(f" 📢 Отправка начального сообщения о старте review...") try: # Save initial event to database from app.models.review_event import ReviewEvent initial_db_event = ReviewEvent( review_id=review.id, event_type="review_started", message=f"Начало review для PR #{pull_request.pr_number}", data={ "repository_id": repository.id, "repository_name": f"{repository.repo_owner}/{repository.repo_name}" } ) db.add(initial_db_event) await db.commit() logger.info(f" 💾 Начальное событие сохранено в БД: {initial_db_event.id}") # Broadcast initial message initial_message = { "type": "review_started", "review_id": review.id, "pr_number": pull_request.pr_number, "timestamp": dt.utcnow().isoformat(), "data": { "message": f"Начало review для PR #{pull_request.pr_number}", "repository_id": repository.id, "repository_name": f"{repository.repo_owner}/{repository.repo_name}" } } await manager.broadcast(initial_message) logger.info(f" ✅ Начальное сообщение отправлено: {len(manager.active_connections)} подключений") except Exception as e: logger.error(f" ❌ Ошибка отправки начального сообщения: {e}") import traceback traceback.print_exc() # Create event handler async def on_review_event(event: dict): """Handle review events and broadcast to clients""" try: # Prepare event data event_data = { "type": event.get("type", "agent_update"), "review_id": review.id, "pr_number": pull_request.pr_number, "timestamp": dt.utcnow().isoformat(), "data": event } logger.info(f" 🔔 Broadcasting event: type={event.get('type')}, connections={len(manager.active_connections)}") # Save event to database from app.models.review_event import ReviewEvent db_event = ReviewEvent( review_id=review.id, event_type=event.get("type", "agent_update"), step=event.get("step"), message=event.get("message"), data=event ) db.add(db_event) await db.commit() logger.debug(f" 💾 Event saved to DB: {db_event.id}") # Broadcast to all connected clients await manager.broadcast(event_data) # Log the event if event.get("type") == "agent_step": step = event.get("step", "unknown") logger.info(f" 📍 Step: {step}") elif event.get("type") == "llm_message": message = event.get("message", "")[:100] logger.info(f" 💬 LLM: {message}...") except Exception as e: logger.error(f" ❌ Ошибка broadcast события: {e}") import traceback traceback.print_exc() agent = ReviewerAgent(db) await agent.run_review_stream( review_id=review.id, pr_number=pull_request.pr_number, repository_id=repository.id, on_event=on_review_event ) logger.info(f" ✅ Review завершен для PR #{pull_request.pr_number}") # Send completion message try: # Save completion event to database from app.models.review_event import ReviewEvent completion_db_event = ReviewEvent( review_id=review.id, event_type="review_completed", message=f"Review завершен для PR #{pull_request.pr_number}", data={} ) db.add(completion_db_event) await db.commit() logger.info(f" 💾 Событие завершения сохранено в БД: {completion_db_event.id}") # Broadcast completion message completion_message = { "type": "review_completed", "review_id": review.id, "pr_number": pull_request.pr_number, "timestamp": dt.utcnow().isoformat(), "data": { "message": f"Review завершен для PR #{pull_request.pr_number}" } } await manager.broadcast(completion_message) logger.info(f" 📢 Сообщение о завершении отправлено: {len(manager.active_connections)} подключений") except Exception as e: logger.error(f" ❌ Ошибка отправки сообщения о завершении: {e}") import traceback traceback.print_exc() # Global worker instance _worker_instance: ReviewTaskWorker | None = None async def start_worker(): """Start the global worker instance""" global _worker_instance if _worker_instance is None: _worker_instance = ReviewTaskWorker() # Запускаем в фоне asyncio.create_task(_worker_instance.start()) async def stop_worker(): """Stop the global worker instance""" global _worker_instance if _worker_instance: await _worker_instance.stop() _worker_instance = None def get_worker() -> ReviewTaskWorker | None: """Get the current worker instance""" return _worker_instance