from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from app.crud import user as crud_user from app.core.security import verify_password, create_access_token, create_refresh_token, decode_token from app.schemas.user import UserCreate from app.schemas.token import Token from datetime import timedelta class AuthService: async def authenticate( self, db: AsyncSession, email: str, password: str ) -> Optional[Token]: """Аутентификация пользователя""" db_user = await crud_user.get_by_email(db, email) if not db_user: return None if not verify_password(password, db_user.hashed_password): return None access_token = create_access_token( data={"sub": db_user.id, "email": db_user.email} ) refresh_token = create_refresh_token( data={"sub": db_user.id, "email": db_user.email} ) return Token( access_token=access_token, refresh_token=refresh_token, token_type="bearer" ) async def register( self, db: AsyncSession, user_in: UserCreate ): """Регистрация нового пользователя""" # Проверка существования пользователя existing_user = await crud_user.get_by_email(db, user_in.email) if existing_user: raise ValueError("User with this email already exists") from app.core.security import get_password_hash hashed_password = get_password_hash(user_in.password) db_user = await crud_user.create(db, user_in, hashed_password) return db_user def verify_token(self, token: str) -> Optional[dict]: """Проверка токена""" payload = decode_token(token) if payload and payload.get("type") == "access": return payload return None def refresh_access_token(self, refresh_token: str) -> Optional[str]: """Обновление access token""" payload = decode_token(refresh_token) if payload and payload.get("type") == "refresh": return create_access_token( data={"sub": payload.get("sub"), "email": payload.get("email")} ) return None auth_service = AuthService()