"""Управление токенами GigaChat.""" import base64 import os import time import uuid from typing import Optional from urllib.parse import urlencode import aiohttp from aiohttp import FormData from dotenv import load_dotenv load_dotenv() class TokenManager: """Менеджер токенов для GigaChat API.""" def __init__( self, client_id: Optional[str] = None, client_secret: Optional[str] = None, auth_url: Optional[str] = None, credentials: Optional[str] = None, ): # Приоритет: переданные параметры > переменные окружения > .env файл self.credentials = credentials or os.environ.get("GIGACHAT_CREDENTIALS") or os.getenv("GIGACHAT_CREDENTIALS") self.client_id = client_id or os.environ.get("GIGACHAT_CLIENT_ID") or os.getenv("GIGACHAT_CLIENT_ID") self.client_secret = client_secret or os.environ.get("GIGACHAT_CLIENT_SECRET") or os.getenv("GIGACHAT_CLIENT_SECRET") self.auth_url = auth_url or os.environ.get("GIGACHAT_AUTH_URL") or os.getenv( "GIGACHAT_AUTH_URL", "https://ngw.devices.sberbank.ru:9443/api/v2/oauth" ) self._access_token: Optional[str] = None self._expires_at: float = 0 async def get_token(self, force_refresh: bool = False) -> str: """ Получить актуальный токен доступа. Args: force_refresh: Принудительно обновить токен Returns: Токен доступа """ if not force_refresh and self._access_token and time.time() < self._expires_at: return self._access_token # Определяем, какой вариант используется: готовый ключ или client_id/client_secret if self.credentials: # Используем готовый ключ авторизации (уже закодированный в Base64) # Убираем префикс "Basic " если он есть credentials_key = self.credentials.strip().replace('\n', '').replace('\r', '') if credentials_key.startswith('Basic '): credentials_key = credentials_key[6:] connector = aiohttp.TCPConnector(ssl=False) async with aiohttp.ClientSession(connector=connector) as session: headers = { "Authorization": f"Basic {credentials_key}", "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "RqUID": str(uuid.uuid4()) } form_data = { "scope": "GIGACHAT_API_PERS" } async with session.post( self.auth_url, headers=headers, data=form_data, ) as response: if response.status != 200: error_text = await response.text() raise Exception(f"Failed to get token: {response.status} - {error_text}") data = await response.json() self._access_token = data["access_token"] expires_in = data.get("expires_in", 1800) self._expires_at = time.time() + expires_in - 300 return self._access_token elif self.client_id and self.client_secret: # Очищаем от пробелов и переносов client_secret = self.client_secret.strip().replace('\n', '').replace('\r', '') # Проверяем, является ли client_secret уже закодированным ключом Base64 # Если secret начинается с букв/цифр и длиннее 50 символов, это уже ключ авторизации is_already_encoded = len(client_secret) > 50 and all(c.isalnum() or c in '+/=' for c in client_secret) if is_already_encoded: # Это уже готовый ключ авторизации в Base64 encoded_credentials = client_secret print(f"DEBUG: Using pre-encoded authorization key (length: {len(encoded_credentials)})") else: # Это настоящий client_id и client_secret, нужно закодировать client_id = self.client_id.strip().replace('\n', '').replace('\r', '') if not client_id or not client_secret: raise Exception("GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET cannot be empty after cleaning") credentials_string = f"{client_id}:{client_secret}" encoded_credentials = base64.b64encode(credentials_string.encode('utf-8')).decode('utf-8') print(f"DEBUG: Encoded client_id:client_secret (length: {len(encoded_credentials)})") connector = aiohttp.TCPConnector(ssl=False) async with aiohttp.ClientSession(connector=connector) as session: headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "RqUID": str(uuid.uuid4()), "Authorization": f"Basic {encoded_credentials}" } payload = {"scope": "GIGACHAT_API_PERS"} print(f"DEBUG: Authorization header starts with: Basic {encoded_credentials[:10]}...") async with session.post( self.auth_url, headers=headers, data=payload, ) as response: if response.status != 200: error_text = await response.text() raise Exception( f"Failed to get token: {response.status} - {error_text}. " f"URL: {self.auth_url}" ) data = await response.json() self._access_token = data["access_token"] expires_in = data.get("expires_in", 1800) self._expires_at = time.time() + expires_in - 300 return self._access_token else: raise Exception( "Either GIGACHAT_CREDENTIALS (ready authorization key) or " "GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET must be set" ) def is_token_valid(self) -> bool: """Проверить, действителен ли текущий токен.""" return self._access_token is not None and time.time() < self._expires_at def clear_token(self): """Очистить токен (для тестирования).""" self._access_token = None self._expires_at = 0