74 lines
2.4 KiB
Python
74 lines
2.4 KiB
Python
from typing import Optional
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.crud import user as crud_user
|
|
from app.core.security import verify_password, create_access_token, create_refresh_token, decode_token
|
|
from app.schemas.user import UserCreate
|
|
from app.schemas.token import Token
|
|
from datetime import timedelta
|
|
|
|
|
|
class AuthService:
|
|
async def authenticate(
|
|
self,
|
|
db: AsyncSession,
|
|
email: str,
|
|
password: str
|
|
) -> Optional[Token]:
|
|
"""Аутентификация пользователя"""
|
|
db_user = await crud_user.get_by_email(db, email)
|
|
if not db_user:
|
|
return None
|
|
|
|
if not verify_password(password, db_user.hashed_password):
|
|
return None
|
|
|
|
access_token = create_access_token(
|
|
data={"sub": db_user.id, "email": db_user.email}
|
|
)
|
|
refresh_token = create_refresh_token(
|
|
data={"sub": db_user.id, "email": db_user.email}
|
|
)
|
|
|
|
return Token(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
token_type="bearer"
|
|
)
|
|
|
|
async def register(
|
|
self,
|
|
db: AsyncSession,
|
|
user_in: UserCreate
|
|
):
|
|
"""Регистрация нового пользователя"""
|
|
# Проверка существования пользователя
|
|
existing_user = await crud_user.get_by_email(db, user_in.email)
|
|
if existing_user:
|
|
raise ValueError("User with this email already exists")
|
|
|
|
from app.core.security import get_password_hash
|
|
hashed_password = get_password_hash(user_in.password)
|
|
|
|
db_user = await crud_user.create(db, user_in, hashed_password)
|
|
return db_user
|
|
|
|
def verify_token(self, token: str) -> Optional[dict]:
|
|
"""Проверка токена"""
|
|
payload = decode_token(token)
|
|
if payload and payload.get("type") == "access":
|
|
return payload
|
|
return None
|
|
|
|
def refresh_access_token(self, refresh_token: str) -> Optional[str]:
|
|
"""Обновление access token"""
|
|
payload = decode_token(refresh_token)
|
|
if payload and payload.get("type") == "refresh":
|
|
return create_access_token(
|
|
data={"sub": payload.get("sub"), "email": payload.get("email")}
|
|
)
|
|
return None
|
|
|
|
|
|
auth_service = AuthService()
|
|
|