Files
New-planet-ai-agent/services/token_manager.py

154 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Управление токенами GigaChat."""
import base64
import os
import time
import uuid
from typing import Optional
from urllib.parse import urlencode
import aiohttp
from aiohttp import FormData
from dotenv import load_dotenv
load_dotenv()
class TokenManager:
"""Менеджер токенов для GigaChat API."""
def __init__(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
auth_url: Optional[str] = None,
credentials: Optional[str] = None,
):
# Приоритет: переданные параметры > переменные окружения > .env файл
self.credentials = credentials or os.environ.get("GIGACHAT_CREDENTIALS") or os.getenv("GIGACHAT_CREDENTIALS")
self.client_id = client_id or os.environ.get("GIGACHAT_CLIENT_ID") or os.getenv("GIGACHAT_CLIENT_ID")
self.client_secret = client_secret or os.environ.get("GIGACHAT_CLIENT_SECRET") or os.getenv("GIGACHAT_CLIENT_SECRET")
self.auth_url = auth_url or os.environ.get("GIGACHAT_AUTH_URL") or os.getenv(
"GIGACHAT_AUTH_URL", "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
)
self._access_token: Optional[str] = None
self._expires_at: float = 0
async def get_token(self, force_refresh: bool = False) -> str:
"""
Получить актуальный токен доступа.
Args:
force_refresh: Принудительно обновить токен
Returns:
Токен доступа
"""
if not force_refresh and self._access_token and time.time() < self._expires_at:
return self._access_token
# Определяем, какой вариант используется: готовый ключ или client_id/client_secret
if self.credentials:
# Используем готовый ключ авторизации (уже закодированный в Base64)
# Убираем префикс "Basic " если он есть
credentials_key = self.credentials.strip().replace('\n', '').replace('\r', '')
if credentials_key.startswith('Basic '):
credentials_key = credentials_key[6:]
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
headers = {
"Authorization": f"Basic {credentials_key}",
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"RqUID": str(uuid.uuid4())
}
form_data = {
"scope": "GIGACHAT_API_PERS"
}
async with session.post(
self.auth_url,
headers=headers,
data=form_data,
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"Failed to get token: {response.status} - {error_text}")
data = await response.json()
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 1800)
self._expires_at = time.time() + expires_in - 300
return self._access_token
elif self.client_id and self.client_secret:
# Очищаем от пробелов и переносов
client_secret = self.client_secret.strip().replace('\n', '').replace('\r', '')
# Проверяем, является ли client_secret уже закодированным ключом Base64
# Если secret начинается с букв/цифр и длиннее 50 символов, это уже ключ авторизации
is_already_encoded = len(client_secret) > 50 and all(c.isalnum() or c in '+/=' for c in client_secret)
if is_already_encoded:
# Это уже готовый ключ авторизации в Base64
encoded_credentials = client_secret
print(f"DEBUG: Using pre-encoded authorization key (length: {len(encoded_credentials)})")
else:
# Это настоящий client_id и client_secret, нужно закодировать
client_id = self.client_id.strip().replace('\n', '').replace('\r', '')
if not client_id or not client_secret:
raise Exception("GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET cannot be empty after cleaning")
credentials_string = f"{client_id}:{client_secret}"
encoded_credentials = base64.b64encode(credentials_string.encode('utf-8')).decode('utf-8')
print(f"DEBUG: Encoded client_id:client_secret (length: {len(encoded_credentials)})")
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"RqUID": str(uuid.uuid4()),
"Authorization": f"Basic {encoded_credentials}"
}
payload = {"scope": "GIGACHAT_API_PERS"}
print(f"DEBUG: Authorization header starts with: Basic {encoded_credentials[:10]}...")
async with session.post(
self.auth_url,
headers=headers,
data=payload,
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(
f"Failed to get token: {response.status} - {error_text}. "
f"URL: {self.auth_url}"
)
data = await response.json()
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 1800)
self._expires_at = time.time() + expires_in - 300
return self._access_token
else:
raise Exception(
"Either GIGACHAT_CREDENTIALS (ready authorization key) or "
"GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET must be set"
)
def is_token_valid(self) -> bool:
"""Проверить, действителен ли текущий токен."""
return self._access_token is not None and time.time() < self._expires_at
def clear_token(self):
"""Очистить токен (для тестирования)."""
self._access_token = None
self._expires_at = 0