code-review-agent/backend/app/api/organizations.py
Primakov Alexandr Alexandrovich 6ae2d0d8ec Add organization and task queue features
- Introduced new models for `Organization` and `ReviewTask` to manage organizations and review tasks.
- Implemented API endpoints for CRUD operations on organizations and tasks, including scanning organizations for repositories and PRs.
- Developed a background worker for sequential processing of review tasks with priority handling and automatic retries.
- Created frontend components for managing organizations and monitoring task queues, including real-time updates and filtering options.
- Added comprehensive documentation for organization features and quick start guides.
- Fixed UI issues and improved navigation for better user experience.
2025-10-13 00:10:04 +03:00

405 lines
13 KiB
Python

"""Organizations API endpoints"""
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import List
import secrets
from app.database import get_db
from app.models import Organization, Repository, PullRequest, ReviewTask
from app.schemas.organization import (
OrganizationCreate,
OrganizationUpdate,
OrganizationResponse,
OrganizationList,
OrganizationScanResult
)
from app.utils import encrypt_token, decrypt_token
from app.config import settings
from app.services.gitea import GiteaService
from app.services.github import GitHubService
from app.services.bitbucket import BitbucketService
router = APIRouter()
@router.get("", response_model=OrganizationList)
async def get_organizations(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Get all organizations"""
# Count total
count_query = select(func.count(Organization.id))
count_result = await db.execute(count_query)
total = count_result.scalar()
# Get organizations
query = select(Organization).order_by(Organization.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
organizations = result.scalars().all()
# Add webhook URLs
items = []
for org in organizations:
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{org.platform.value}/org/{org.id}"
items.append(OrganizationResponse(
**org.__dict__,
webhook_url=webhook_url
))
return OrganizationList(items=items, total=total)
@router.post("", response_model=OrganizationResponse)
async def create_organization(
organization: OrganizationCreate,
db: AsyncSession = Depends(get_db)
):
"""Create a new organization"""
# Generate webhook secret if not provided
webhook_secret = organization.webhook_secret or secrets.token_urlsafe(32)
# Encrypt API token (если указан)
encrypted_token = encrypt_token(organization.api_token) if organization.api_token else None
# Create organization
db_organization = Organization(
name=organization.name,
platform=organization.platform,
base_url=organization.base_url.rstrip('/'),
api_token=encrypted_token,
webhook_secret=webhook_secret,
config=organization.config or {}
)
db.add(db_organization)
await db.commit()
await db.refresh(db_organization)
# Prepare response
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{db_organization.platform.value}/org/{db_organization.id}"
return OrganizationResponse(
**db_organization.__dict__,
webhook_url=webhook_url
)
@router.get("/{organization_id}", response_model=OrganizationResponse)
async def get_organization(
organization_id: int,
db: AsyncSession = Depends(get_db)
):
"""Get organization by ID"""
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{organization.platform.value}/org/{organization.id}"
return OrganizationResponse(
**organization.__dict__,
webhook_url=webhook_url
)
@router.put("/{organization_id}", response_model=OrganizationResponse)
async def update_organization(
organization_id: int,
organization_update: OrganizationUpdate,
db: AsyncSession = Depends(get_db)
):
"""Update organization"""
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
# Update fields
update_data = organization_update.model_dump(exclude_unset=True)
# Encrypt API token if provided and not empty
if "api_token" in update_data and update_data["api_token"]:
update_data["api_token"] = encrypt_token(update_data["api_token"])
elif "api_token" in update_data and not update_data["api_token"]:
# If empty string provided, don't update token
del update_data["api_token"]
for field, value in update_data.items():
setattr(organization, field, value)
await db.commit()
await db.refresh(organization)
webhook_url = f"http://{settings.host}:{settings.port}/api/webhooks/{organization.platform.value}/org/{organization.id}"
return OrganizationResponse(
**organization.__dict__,
webhook_url=webhook_url
)
@router.delete("/{organization_id}")
async def delete_organization(
organization_id: int,
db: AsyncSession = Depends(get_db)
):
"""Delete organization"""
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
await db.delete(organization)
await db.commit()
return {"message": "Organization deleted successfully"}
@router.post("/{organization_id}/scan", response_model=OrganizationScanResult)
async def scan_organization(
organization_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""Scan organization for repositories and PRs"""
# Get organization
result = await db.execute(
select(Organization).where(Organization.id == organization_id)
)
organization = result.scalar_one_or_none()
if not organization:
raise HTTPException(status_code=404, detail="Organization not found")
# Get API token
if organization.api_token:
try:
api_token = decrypt_token(organization.api_token)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
else:
# Use master token
platform = organization.platform.value.lower()
if platform == "gitea":
api_token = settings.master_gitea_token
elif platform == "github":
api_token = settings.master_github_token
elif platform == "bitbucket":
api_token = settings.master_bitbucket_token
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {organization.platform}")
if not api_token:
raise HTTPException(
status_code=400,
detail=f"API token not set and master token for {platform} not configured"
)
# Start scan
scan_result = OrganizationScanResult(
organization_id=organization_id,
repositories_found=0,
repositories_added=0,
pull_requests_found=0,
tasks_created=0,
errors=[]
)
try:
if organization.platform.value == "gitea":
await _scan_gitea_organization(organization, api_token, scan_result, db)
elif organization.platform.value == "github":
await _scan_github_organization(organization, api_token, scan_result, db)
elif organization.platform.value == "bitbucket":
await _scan_bitbucket_organization(organization, api_token, scan_result, db)
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {organization.platform}")
# Update last scan time
from datetime import datetime
organization.last_scan_at = datetime.utcnow()
await db.commit()
except Exception as e:
scan_result.errors.append(str(e))
raise HTTPException(status_code=500, detail=f"Scan failed: {str(e)}")
return scan_result
async def _scan_gitea_organization(
organization: Organization,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan Gitea organization for repositories and PRs"""
import httpx
headers = {
"Authorization": f"token {api_token}",
"Content-Type": "application/json"
}
# Get all repositories in organization
url = f"{organization.base_url}/api/v1/orgs/{organization.name}/repos"
print(f"\n🔍 Сканирование организации {organization.name} на {organization.base_url}")
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
repos = response.json()
scan_result.repositories_found = len(repos)
print(f" Найдено репозиториев: {len(repos)}")
for repo_data in repos:
repo_name = repo_data["name"]
repo_owner = repo_data["owner"]["login"]
repo_url = repo_data["html_url"]
# Check if repository already exists
existing_repo = await db.execute(
select(Repository).where(Repository.url == repo_url)
)
repository = existing_repo.scalar_one_or_none()
if not repository:
# Create new repository
repository = Repository(
name=f"{repo_owner}/{repo_name}",
platform=organization.platform,
url=repo_url,
api_token=organization.api_token, # Use same token as org
webhook_secret=organization.webhook_secret,
config=organization.config
)
db.add(repository)
await db.flush()
scan_result.repositories_added += 1
print(f" ✅ Добавлен репозиторий: {repo_owner}/{repo_name}")
# Scan PRs in this repository
await _scan_repository_prs(
repository,
organization.base_url,
repo_owner,
repo_name,
api_token,
scan_result,
db
)
await db.commit()
async def _scan_github_organization(
organization: Organization,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan GitHub organization"""
# TODO: Implement GitHub org scanning
scan_result.errors.append("GitHub organization scanning not yet implemented")
async def _scan_bitbucket_organization(
organization: Organization,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan Bitbucket organization"""
# TODO: Implement Bitbucket org scanning
scan_result.errors.append("Bitbucket organization scanning not yet implemented")
async def _scan_repository_prs(
repository: Repository,
base_url: str,
owner: str,
repo: str,
api_token: str,
scan_result: OrganizationScanResult,
db: AsyncSession
):
"""Scan repository for open PRs and create tasks"""
import httpx
headers = {
"Authorization": f"token {api_token}",
"Content-Type": "application/json"
}
# Get open PRs
url = f"{base_url}/api/v1/repos/{owner}/{repo}/pulls?state=open"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
prs = response.json()
for pr_data in prs:
pr_number = pr_data["number"]
scan_result.pull_requests_found += 1
# Check if PR already exists
existing_pr = await db.execute(
select(PullRequest).where(
PullRequest.repository_id == repository.id,
PullRequest.pr_number == pr_number
)
)
pull_request = existing_pr.scalar_one_or_none()
if not pull_request:
# Create new PR
pull_request = PullRequest(
repository_id=repository.id,
pr_number=pr_number,
title=pr_data["title"],
author=pr_data["user"]["login"],
source_branch=pr_data["head"]["ref"],
target_branch=pr_data["base"]["ref"],
url=pr_data["html_url"],
status="OPEN"
)
db.add(pull_request)
await db.flush()
# Check if task already exists for this PR
existing_task = await db.execute(
select(ReviewTask).where(
ReviewTask.pull_request_id == pull_request.id,
ReviewTask.status.in_(["pending", "in_progress"])
)
)
task = existing_task.scalar_one_or_none()
if not task:
# Create review task
task = ReviewTask(
pull_request_id=pull_request.id,
priority="normal"
)
db.add(task)
scan_result.tasks_created += 1
print(f" 📝 Создана задача для PR #{pr_number}: {pr_data['title']}")