from datetime import datetime, timedelta from typing import Optional, Dict, Any from jose import JWTError, jwt import bcrypt from app.core.config import settings def verify_password(plain_password: str, hashed_password: str) -> bool: """Проверка пароля""" # Используем bcrypt напрямую для проверки try: return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8')) except (ValueError, TypeError, AttributeError): return False def _truncate_password_to_72_bytes(password: str) -> str: """Обрезает пароль до 72 байт, корректно обрабатывая UTF-8""" password_bytes = password.encode('utf-8') if len(password_bytes) <= 72: return password # Обрезаем до 72 байт password_bytes = password_bytes[:72] # Удаляем неполные UTF-8 последовательности в конце # (байты, которые начинаются с 10xxxxxx, но не являются началом символа) while password_bytes and (password_bytes[-1] & 0xC0) == 0x80: password_bytes = password_bytes[:-1] return password_bytes.decode('utf-8', errors='replace') def get_password_hash(password: str) -> str: """Хеширование пароля""" # bcrypt имеет ограничение в 72 байта # Обрезаем пароль до 72 байт перед хешированием password_bytes = password.encode('utf-8') if len(password_bytes) > 72: # Обрезаем до 72 байт password_bytes = password_bytes[:72] # Удаляем неполные UTF-8 последовательности в конце while password_bytes and (password_bytes[-1] & 0xC0) == 0x80: password_bytes = password_bytes[:-1] password = password_bytes.decode('utf-8', errors='replace') password_bytes = password.encode('utf-8') # Используем bcrypt напрямую, чтобы избежать проблем с инициализацией passlib salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password_bytes, salt) return hashed.decode('utf-8') def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """Создание JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire, "type": "access"}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def create_refresh_token(data: Dict[str, Any]) -> str: """Создание JWT refresh token""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "type": "refresh"}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def decode_token(token: str) -> Optional[Dict[str, Any]]: """Декодирование JWT token""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) return payload except JWTError: return None