Primakov Alexandr Alexandrovich 6ae2d0d8ec Add organization and task queue features
- Introduced new models for `Organization` and `ReviewTask` to manage organizations and review tasks.
- Implemented API endpoints for CRUD operations on organizations and tasks, including scanning organizations for repositories and PRs.
- Developed a background worker for sequential processing of review tasks with priority handling and automatic retries.
- Created frontend components for managing organizations and monitoring task queues, including real-time updates and filtering options.
- Added comprehensive documentation for organization features and quick start guides.
- Fixed UI issues and improved navigation for better user experience.
2025-10-13 00:10:04 +03:00

198 lines
5.3 KiB
Python

"""Task Queue API endpoints"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import List
from pydantic import BaseModel
from datetime import datetime
from app.database import get_db
from app.models import ReviewTask, PullRequest
from app.models.review_task import TaskStatusEnum, TaskPriorityEnum
from app.workers.task_worker import get_worker
router = APIRouter()
class TaskResponse(BaseModel):
"""Task response schema"""
id: int
pull_request_id: int
pr_number: int | None
pr_title: str | None
status: TaskStatusEnum
priority: TaskPriorityEnum
created_at: datetime
started_at: datetime | None
completed_at: datetime | None
error_message: str | None
retry_count: int
max_retries: int
class Config:
from_attributes = True
class TaskListResponse(BaseModel):
"""Task list response"""
items: List[TaskResponse]
total: int
pending: int
in_progress: int
completed: int
failed: int
class WorkerStatusResponse(BaseModel):
"""Worker status response"""
running: bool
current_task_id: int | None
poll_interval: int
@router.get("", response_model=TaskListResponse)
async def get_tasks(
status: TaskStatusEnum | None = None,
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Get all tasks"""
# Count total
count_query = select(func.count(ReviewTask.id))
if status:
count_query = count_query.where(ReviewTask.status == status)
count_result = await db.execute(count_query)
total = count_result.scalar()
# Count by status
pending_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.PENDING)
)
in_progress_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.IN_PROGRESS)
)
completed_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.COMPLETED)
)
failed_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.FAILED)
)
# Get tasks with PR info
query = select(ReviewTask, PullRequest).join(
PullRequest, ReviewTask.pull_request_id == PullRequest.id
).order_by(ReviewTask.created_at.desc())
if status:
query = query.where(ReviewTask.status == status)
query = query.offset(skip).limit(limit)
result = await db.execute(query)
rows = result.all()
items = []
for task, pr in rows:
items.append(TaskResponse(
id=task.id,
pull_request_id=task.pull_request_id,
pr_number=pr.pr_number,
pr_title=pr.title,
status=task.status,
priority=task.priority,
created_at=task.created_at,
started_at=task.started_at,
completed_at=task.completed_at,
error_message=task.error_message,
retry_count=task.retry_count,
max_retries=task.max_retries
))
return TaskListResponse(
items=items,
total=total,
pending=pending_count.scalar(),
in_progress=in_progress_count.scalar(),
completed=completed_count.scalar(),
failed=failed_count.scalar()
)
@router.get("/worker/status", response_model=WorkerStatusResponse)
async def get_worker_status():
"""Get worker status"""
worker = get_worker()
if not worker:
return WorkerStatusResponse(
running=False,
current_task_id=None,
poll_interval=0
)
return WorkerStatusResponse(
running=worker.running,
current_task_id=worker.current_task_id,
poll_interval=worker.poll_interval
)
@router.post("/{task_id}/retry")
async def retry_task(
task_id: int,
db: AsyncSession = Depends(get_db)
):
"""Retry failed task"""
result = await db.execute(
select(ReviewTask).where(ReviewTask.id == task_id)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task.status not in [TaskStatusEnum.FAILED, TaskStatusEnum.COMPLETED]:
raise HTTPException(
status_code=400,
detail=f"Cannot retry task with status: {task.status}"
)
# Reset task
task.status = TaskStatusEnum.PENDING
task.error_message = None
task.retry_count = 0
task.started_at = None
task.completed_at = None
await db.commit()
return {"message": "Task queued for retry"}
@router.delete("/{task_id}")
async def delete_task(
task_id: int,
db: AsyncSession = Depends(get_db)
):
"""Delete task"""
result = await db.execute(
select(ReviewTask).where(ReviewTask.id == task_id)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task.status == TaskStatusEnum.IN_PROGRESS:
raise HTTPException(
status_code=400,
detail="Cannot delete task that is in progress"
)
await db.delete(task)
await db.commit()
return {"message": "Task deleted"}