67 lines
2.2 KiB
Python
67 lines
2.2 KiB
Python
from typing import Generic, TypeVar, Type, Optional, List, Dict, Any
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, update, delete
|
|
from sqlalchemy.orm import selectinload
|
|
from app.db.base import BaseModel
|
|
|
|
ModelType = TypeVar("ModelType", bound=BaseModel)
|
|
|
|
|
|
class CRUDBase(Generic[ModelType]):
|
|
def __init__(self, model: Type[ModelType]):
|
|
self.model = model
|
|
|
|
async def get(self, db: AsyncSession, id: str) -> Optional[ModelType]:
|
|
"""Получить объект по ID"""
|
|
result = await db.execute(select(self.model).where(self.model.id == id))
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_multi(
|
|
self,
|
|
db: AsyncSession,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
filters: Optional[Dict[str, Any]] = None
|
|
) -> List[ModelType]:
|
|
"""Получить список объектов"""
|
|
query = select(self.model)
|
|
|
|
if filters:
|
|
for key, value in filters.items():
|
|
if hasattr(self.model, key):
|
|
query = query.where(getattr(self.model, key) == value)
|
|
|
|
query = query.offset(skip).limit(limit)
|
|
result = await db.execute(query)
|
|
return result.scalars().all()
|
|
|
|
async def create(self, db: AsyncSession, obj_in: Dict[str, Any]) -> ModelType:
|
|
"""Создать объект"""
|
|
db_obj = self.model(**obj_in)
|
|
db.add(db_obj)
|
|
await db.commit()
|
|
await db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
async def update(
|
|
self,
|
|
db: AsyncSession,
|
|
db_obj: ModelType,
|
|
obj_in: Dict[str, Any]
|
|
) -> ModelType:
|
|
"""Обновить объект"""
|
|
for field, value in obj_in.items():
|
|
if hasattr(db_obj, field) and value is not None:
|
|
setattr(db_obj, field, value)
|
|
|
|
await db.commit()
|
|
await db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
async def delete(self, db: AsyncSession, id: str) -> bool:
|
|
"""Удалить объект"""
|
|
result = await db.execute(delete(self.model).where(self.model.id == id))
|
|
await db.commit()
|
|
return result.rowcount > 0
|
|
|