Add organization and task queue features

- Introduced new models for `Organization` and `ReviewTask` to manage organizations and review tasks.
- Implemented API endpoints for CRUD operations on organizations and tasks, including scanning organizations for repositories and PRs.
- Developed a background worker for sequential processing of review tasks with priority handling and automatic retries.
- Created frontend components for managing organizations and monitoring task queues, including real-time updates and filtering options.
- Added comprehensive documentation for organization features and quick start guides.
- Fixed UI issues and improved navigation for better user experience.
This commit is contained in:
Primakov Alexandr Alexandrovich
2025-10-13 00:10:04 +03:00
parent 70889421ea
commit 6ae2d0d8ec
18 changed files with 2725 additions and 3 deletions

View File

@@ -2,13 +2,15 @@
from fastapi import APIRouter
from app.api import repositories, reviews, webhooks
from app.api import repositories, reviews, webhooks, organizations, tasks
api_router = APIRouter()
api_router.include_router(repositories.router, prefix="/repositories", tags=["repositories"])
api_router.include_router(reviews.router, prefix="/reviews", tags=["reviews"])
api_router.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"])
api_router.include_router(organizations.router, prefix="/organizations", tags=["organizations"])
api_router.include_router(tasks.router, prefix="/tasks", tags=["tasks"])
__all__ = ["api_router"]

View File

@@ -0,0 +1,404 @@
"""Organizations API endpoints"""
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import List
import secrets
from app.database import get_db
from app.models import Organization, Repository, PullRequest, ReviewTask
from app.schemas.organization import (
OrganizationCreate,
OrganizationUpdate,
OrganizationResponse,
OrganizationList,
OrganizationScanResult
)
from app.utils import encrypt_token, decrypt_token
from app.config import settings
from app.services.gitea import GiteaService
from app.services.github import GitHubService
from app.services.bitbucket import BitbucketService
router = APIRouter()
@router.get("", response_model=OrganizationList)
async def get_organizations(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Get all organizations"""
# Count total
count_query = select(func.count(Organization.id))
count_result = await db.execute(count_query)
total = count_result.scalar()
# Get organizations
query = select(Organization).order_by(Organization.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
organizations = result.scalars().all()
# Add webhook URLs
items = []
for org in organizations:
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{org.platform.value}/org/{org.id}"
items.append(OrganizationResponse(
**org.__dict__,
webhook_url=webhook_url
))
return OrganizationList(items=items, total=total)
@router.post("", response_model=OrganizationResponse)
async def create_organization(
organization: OrganizationCreate,
db: AsyncSession = Depends(get_db)
):
"""Create a new organization"""
# Generate webhook secret if not provided
webhook_secret = organization.webhook_secret or secrets.token_urlsafe(32)
# Encrypt API token (если указан)
encrypted_token = encrypt_token(organization.api_token) if organization.api_token else None
# Create organization
db_organization = Organization(
name=organization.name,
platform=organization.platform,
base_url=organization.base_url.rstrip('/'),
api_token=encrypted_token,
webhook_secret=webhook_secret,
config=organization.config or {}
)
db.add(db_organization)
await db.commit()
await db.refresh(db_organization)
# Prepare response
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{db_organization.platform.value}/org/{db_organization.id}"
return OrganizationResponse(
**db_organization.__dict__,
webhook_url=webhook_url
)
@router.get("/{organization_id}", response_model=OrganizationResponse)
async def get_organization(
organization_id: int,
db: AsyncSession = Depends(get_db)
):
"""Get organization by ID"""
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{organization.platform.value}/org/{organization.id}"
return OrganizationResponse(
**organization.__dict__,
webhook_url=webhook_url
)
@router.put("/{organization_id}", response_model=OrganizationResponse)
async def update_organization(
organization_id: int,
organization_update: OrganizationUpdate,
db: AsyncSession = Depends(get_db)
):
"""Update organization"""
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
# Update fields
update_data = organization_update.model_dump(exclude_unset=True)
# Encrypt API token if provided and not empty
if "api_token" in update_data and update_data["api_token"]:
update_data["api_token"] = encrypt_token(update_data["api_token"])
elif "api_token" in update_data and not update_data["api_token"]:
# If empty string provided, don't update token
del update_data["api_token"]
for field, value in update_data.items():
setattr(organization, field, value)
await db.commit()
await db.refresh(organization)
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{organization.platform.value}/org/{organization.id}"
return OrganizationResponse(
**organization.__dict__,
webhook_url=webhook_url
)
@router.delete("/{organization_id}")
async def delete_organization(
organization_id: int,
db: AsyncSession = Depends(get_db)
):
"""Delete organization"""
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
await db.delete(organization)
await db.commit()
return {"message": "Organization deleted successfully"}
@router.post("/{organization_id}/scan", response_model=OrganizationScanResult)
async def scan_organization(
organization_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""Scan organization for repositories and PRs"""
# Get organization
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
# Get API token
if organization.api_token:
try:
api_token = decrypt_token(organization.api_token)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
else:
# Use master token
platform = organization.platform.value.lower()
if platform == "gitea":
api_token = settings.master_gitea_token
elif platform == "github":
api_token = settings.master_github_token
elif platform == "bitbucket":
api_token = settings.master_bitbucket_token
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {organization.platform}")
if not api_token:
raise HTTPException(
status_code=400,
detail=f"API token not set and master token for {platform} not configured"
)
# Start scan
scan_result = OrganizationScanResult(
organization_id=organization_id,
repositories_found=0,
repositories_added=0,
pull_requests_found=0,
tasks_created=0,
errors=[]
)
try:
if organization.platform.value == "gitea":
await _scan_gitea_organization(organization, api_token, scan_result, db)
elif organization.platform.value == "github":
await _scan_github_organization(organization, api_token, scan_result, db)
elif organization.platform.value == "bitbucket":
await _scan_bitbucket_organization(organization, api_token, scan_result, db)
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {organization.platform}")
# Update last scan time
from datetime import datetime
organization.last_scan_at = datetime.utcnow()
await db.commit()
except Exception as e:
scan_result.errors.append(str(e))
raise HTTPException(status_code=500, detail=f"Scan failed: {str(e)}")
return scan_result
async def _scan_gitea_organization(
organization: Organization,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan Gitea organization for repositories and PRs"""
import httpx
headers = {
"Authorization": f"token {api_token}",
"Content-Type": "application/json"
}
# Get all repositories in organization
url = f"{organization.base_url}/api/v1/orgs/{organization.name}/repos"
print(f"\n🔍 Сканирование организации {organization.name} на {organization.base_url}")
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
repos = response.json()
scan_result.repositories_found = len(repos)
print(f" Найдено репозиториев: {len(repos)}")
for repo_data in repos:
repo_name = repo_data["name"]
repo_owner = repo_data["owner"]["login"]
repo_url = repo_data["html_url"]
# Check if repository already exists
existing_repo = await db.execute(
select(Repository).where(Repository.url == repo_url)
)
repository = existing_repo.scalar_one_or_none()
if not repository:
# Create new repository
repository = Repository(
name=f"{repo_owner}/{repo_name}",
platform=organization.platform,
url=repo_url,
api_token=organization.api_token, # Use same token as org
webhook_secret=organization.webhook_secret,
config=organization.config
)
db.add(repository)
await db.flush()
scan_result.repositories_added += 1
print(f" ✅ Добавлен репозиторий: {repo_owner}/{repo_name}")
# Scan PRs in this repository
await _scan_repository_prs(
repository,
organization.base_url,
repo_owner,
repo_name,
api_token,
scan_result,
db
)
await db.commit()
async def _scan_github_organization(
organization: Organization,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan GitHub organization"""
# TODO: Implement GitHub org scanning
scan_result.errors.append("GitHub organization scanning not yet implemented")
async def _scan_bitbucket_organization(
organization: Organization,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan Bitbucket organization"""
# TODO: Implement Bitbucket org scanning
scan_result.errors.append("Bitbucket organization scanning not yet implemented")
async def _scan_repository_prs(
repository: Repository,
base_url: str,
owner: str,
repo: str,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan repository for open PRs and create tasks"""
import httpx
headers = {
"Authorization": f"token {api_token}",
"Content-Type": "application/json"
}
# Get open PRs
url = f"{base_url}/api/v1/repos/{owner}/{repo}/pulls?state=open"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
prs = response.json()
for pr_data in prs:
pr_number = pr_data["number"]
scan_result.pull_requests_found += 1
# Check if PR already exists
existing_pr = await db.execute(
select(PullRequest).where(
PullRequest.repository_id == repository.id,
PullRequest.pr_number == pr_number
)
)
pull_request = existing_pr.scalar_one_or_none()
if not pull_request:
# Create new PR
pull_request = PullRequest(
repository_id=repository.id,
pr_number=pr_number,
title=pr_data["title"],
author=pr_data["user"]["login"],
source_branch=pr_data["head"]["ref"],
target_branch=pr_data["base"]["ref"],
url=pr_data["html_url"],
status="OPEN"
)
db.add(pull_request)
await db.flush()
# Check if task already exists for this PR
existing_task = await db.execute(
select(ReviewTask).where(
ReviewTask.pull_request_id == pull_request.id,
ReviewTask.status.in_(["pending", "in_progress"])
)
)
task = existing_task.scalar_one_or_none()
if not task:
# Create review task
task = ReviewTask(
pull_request_id=pull_request.id,
priority="normal"
)
db.add(task)
scan_result.tasks_created += 1
print(f" 📝 Создана задача для PR #{pr_number}: {pr_data['title']}")

197
backend/app/api/tasks.py Normal file
View File

@@ -0,0 +1,197 @@
"""Task Queue API endpoints"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import List
from pydantic import BaseModel
from datetime import datetime
from app.database import get_db
from app.models import ReviewTask, PullRequest
from app.models.review_task import TaskStatusEnum, TaskPriorityEnum
from app.workers.task_worker import get_worker
router = APIRouter()
class TaskResponse(BaseModel):
"""Task response schema"""
id: int
pull_request_id: int
pr_number: int | None
pr_title: str | None
status: TaskStatusEnum
priority: TaskPriorityEnum
created_at: datetime
started_at: datetime | None
completed_at: datetime | None
error_message: str | None
retry_count: int
max_retries: int
class Config:
from_attributes = True
class TaskListResponse(BaseModel):
"""Task list response"""
items: List[TaskResponse]
total: int
pending: int
in_progress: int
completed: int
failed: int
class WorkerStatusResponse(BaseModel):
"""Worker status response"""
running: bool
current_task_id: int | None
poll_interval: int
@router.get("", response_model=TaskListResponse)
async def get_tasks(
status: TaskStatusEnum | None = None,
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Get all tasks"""
# Count total
count_query = select(func.count(ReviewTask.id))
if status:
count_query = count_query.where(ReviewTask.status == status)
count_result = await db.execute(count_query)
total = count_result.scalar()
# Count by status
pending_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.PENDING)
)
in_progress_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.IN_PROGRESS)
)
completed_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.COMPLETED)
)
failed_count = await db.execute(
select(func.count(ReviewTask.id)).where(ReviewTask.status == TaskStatusEnum.FAILED)
)
# Get tasks with PR info
query = select(ReviewTask, PullRequest).join(
PullRequest, ReviewTask.pull_request_id == PullRequest.id
).order_by(ReviewTask.created_at.desc())
if status:
query = query.where(ReviewTask.status == status)
query = query.offset(skip).limit(limit)
result = await db.execute(query)
rows = result.all()
items = []
for task, pr in rows:
items.append(TaskResponse(
id=task.id,
pull_request_id=task.pull_request_id,
pr_number=pr.pr_number,
pr_title=pr.title,
status=task.status,
priority=task.priority,
created_at=task.created_at,
started_at=task.started_at,
completed_at=task.completed_at,
error_message=task.error_message,
retry_count=task.retry_count,
max_retries=task.max_retries
))
return TaskListResponse(
items=items,
total=total,
pending=pending_count.scalar(),
in_progress=in_progress_count.scalar(),
completed=completed_count.scalar(),
failed=failed_count.scalar()
)
@router.get("/worker/status", response_model=WorkerStatusResponse)
async def get_worker_status():
"""Get worker status"""
worker = get_worker()
if not worker:
return WorkerStatusResponse(
running=False,
current_task_id=None,
poll_interval=0
)
return WorkerStatusResponse(
running=worker.running,
current_task_id=worker.current_task_id,
poll_interval=worker.poll_interval
)
@router.post("/{task_id}/retry")
async def retry_task(
task_id: int,
db: AsyncSession = Depends(get_db)
):
"""Retry failed task"""
result = await db.execute(
select(ReviewTask).where(ReviewTask.id == task_id)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task.status not in [TaskStatusEnum.FAILED, TaskStatusEnum.COMPLETED]:
raise HTTPException(
status_code=400,
detail=f"Cannot retry task with status: {task.status}"
)
# Reset task
task.status = TaskStatusEnum.PENDING
task.error_message = None
task.retry_count = 0
task.started_at = None
task.completed_at = None
await db.commit()
return {"message": "Task queued for retry"}
@router.delete("/{task_id}")
async def delete_task(
task_id: int,
db: AsyncSession = Depends(get_db)
):
"""Delete task"""
result = await db.execute(
select(ReviewTask).where(ReviewTask.id == task_id)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task.status == TaskStatusEnum.IN_PROGRESS:
raise HTTPException(
status_code=400,
detail="Cannot delete task that is in progress"
)
await db.delete(task)
await db.commit()
return {"message": "Task deleted"}