Files
2025-12-13 14:39:50 +03:00

52 lines
1.7 KiB
Python

from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.crud.base import CRUDBase
class CRUDUser(CRUDBase[User]):
async def get_by_email(self, db: AsyncSession, email: str) -> Optional[User]:
"""Получить пользователя по email"""
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def create(self, db: AsyncSession, obj_in: UserCreate, hashed_password: str) -> User:
"""Создать пользователя"""
db_obj = User(
email=obj_in.email,
hashed_password=hashed_password,
role=obj_in.role,
full_name=obj_in.full_name
)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def update(
self,
db: AsyncSession,
db_obj: User,
obj_in: UserUpdate
) -> User:
"""Обновить пользователя"""
update_data = obj_in.model_dump(exclude_unset=True)
if "password" in update_data:
# Пароль нужно хешировать отдельно
from app.core.security import get_password_hash
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(db_obj, field, value)
await db.commit()
await db.refresh(db_obj)
return db_obj
user = CRUDUser(User)