"""Сервис кэширования для Redis.""" import json from typing import Any, Dict, List, Optional import redis.asyncio as redis from dotenv import load_dotenv load_dotenv() class CacheService: """Сервис для работы с Redis кэшем.""" def __init__(self, redis_url: Optional[str] = None): self.redis_url = redis_url or "redis://localhost:6379/0" self._client: Optional[redis.Redis] = None async def _get_client(self) -> redis.Redis: """Получить клиент Redis (lazy initialization).""" if self._client is None: self._client = await redis.from_url(self.redis_url, decode_responses=True) return self._client async def get_context(self, conversation_id: str, max_messages: int = 50) -> List[Dict[str, str]]: """ Получить контекст разговора из кэша. Args: conversation_id: ID разговора max_messages: Максимальное количество сообщений Returns: Список сообщений в формате [{"role": "...", "content": "..."}] """ client = await self._get_client() key = f"conversation:{conversation_id}" data = await client.get(key) if not data: return [] messages = json.loads(data) # Возвращаем последние N сообщений return messages[-max_messages:] if len(messages) > max_messages else messages async def save_context(self, conversation_id: str, messages: List[Dict[str, str]], ttl: int = 86400): """ Сохранить контекст разговора в кэш. Args: conversation_id: ID разговора messages: Список сообщений ttl: Время жизни в секундах (по умолчанию 24 часа) """ client = await self._get_client() key = f"conversation:{conversation_id}" # Ограничиваем количество сообщений для экономии памяти max_messages = 100 if len(messages) > max_messages: messages = messages[-max_messages:] await client.setex(key, ttl, json.dumps(messages, ensure_ascii=False)) async def add_message(self, conversation_id: str, role: str, content: str): """ Добавить сообщение в контекст разговора. Args: conversation_id: ID разговора role: Роль (user, assistant, system) content: Содержимое сообщения """ messages = await self.get_context(conversation_id, max_messages=1000) messages.append({"role": role, "content": content}) await self.save_context(conversation_id, messages) async def clear_context(self, conversation_id: str): """Очистить контекст разговора.""" client = await self._get_client() key = f"conversation:{conversation_id}" await client.delete(key) async def get(self, key: str) -> Optional[Any]: """Получить значение по ключу.""" client = await self._get_client() data = await client.get(key) return json.loads(data) if data else None async def set(self, key: str, value: Any, ttl: int = 3600): """Установить значение с TTL.""" client = await self._get_client() await client.setex(key, ttl, json.dumps(value, ensure_ascii=False)) async def close(self): """Закрыть соединение с Redis.""" if self._client: await self._client.close() self._client = None