Files
New-planet-ai-agent/services/cache_service.py

101 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Сервис кэширования для Redis."""
import json
from typing import Any, Dict, List, Optional
import redis.asyncio as redis
from dotenv import load_dotenv
load_dotenv()
class CacheService:
"""Сервис для работы с Redis кэшем."""
def __init__(self, redis_url: Optional[str] = None):
self.redis_url = redis_url or "redis://localhost:6379/0"
self._client: Optional[redis.Redis] = None
async def _get_client(self) -> redis.Redis:
"""Получить клиент Redis (lazy initialization)."""
if self._client is None:
self._client = await redis.from_url(self.redis_url, decode_responses=True)
return self._client
async def get_context(self, conversation_id: str, max_messages: int = 50) -> List[Dict[str, str]]:
"""
Получить контекст разговора из кэша.
Args:
conversation_id: ID разговора
max_messages: Максимальное количество сообщений
Returns:
Список сообщений в формате [{"role": "...", "content": "..."}]
"""
client = await self._get_client()
key = f"conversation:{conversation_id}"
data = await client.get(key)
if not data:
return []
messages = json.loads(data)
# Возвращаем последние N сообщений
return messages[-max_messages:] if len(messages) > max_messages else messages
async def save_context(self, conversation_id: str, messages: List[Dict[str, str]], ttl: int = 86400):
"""
Сохранить контекст разговора в кэш.
Args:
conversation_id: ID разговора
messages: Список сообщений
ttl: Время жизни в секундах (по умолчанию 24 часа)
"""
client = await self._get_client()
key = f"conversation:{conversation_id}"
# Ограничиваем количество сообщений для экономии памяти
max_messages = 100
if len(messages) > max_messages:
messages = messages[-max_messages:]
await client.setex(key, ttl, json.dumps(messages, ensure_ascii=False))
async def add_message(self, conversation_id: str, role: str, content: str):
"""
Добавить сообщение в контекст разговора.
Args:
conversation_id: ID разговора
role: Роль (user, assistant, system)
content: Содержимое сообщения
"""
messages = await self.get_context(conversation_id, max_messages=1000)
messages.append({"role": role, "content": content})
await self.save_context(conversation_id, messages)
async def clear_context(self, conversation_id: str):
"""Очистить контекст разговора."""
client = await self._get_client()
key = f"conversation:{conversation_id}"
await client.delete(key)
async def get(self, key: str) -> Optional[Any]:
"""Получить значение по ключу."""
client = await self._get_client()
data = await client.get(key)
return json.loads(data) if data else None
async def set(self, key: str, value: Any, ttl: int = 3600):
"""Установить значение с TTL."""
client = await self._get_client()
await client.setex(key, ttl, json.dumps(value, ensure_ascii=False))
async def close(self):
"""Закрыть соединение с Redis."""
if self._client:
await self._client.close()
self._client = None