Compare commits

..

6 Commits

35 changed files with 2411 additions and 2 deletions

47
.dockerignore Normal file
View File

@@ -0,0 +1,47 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
ENV/
.venv
# Testing
.pytest_cache/
.coverage
htmlcov/
*.cover
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Project specific
.env
*.log
*.db
*.sqlite
# Git
.git/
.gitignore
# Docker
docker-compose.yml
Dockerfile
.dockerignore
# Documentation
*.md
!README.md

4
.env
View File

@@ -1,5 +1,5 @@
GIGACHAT_CLIENT_ID=your-client-id GIGACHAT_CLIENT_ID=019966f0-5781-76e6-a84f-ec7de158188a
GIGACHAT_CLIENT_SECRET=your-client-secret GIGACHAT_CLIENT_SECRET=MDE5OTY2ZjAtNTc4MS03NmU2LWE4NGYtZWM3ZGUxNTgxODhhOjI3MDMxZjIxLWY3NWYtNGI4NS05MzM1LTI4ZDYyOWM3MmM0MA==
GIGACHAT_AUTH_URL=https://ngw.devices.sberbank.ru:9443/api/v2/oauth GIGACHAT_AUTH_URL=https://ngw.devices.sberbank.ru:9443/api/v2/oauth
GIGACHAT_BASE_URL=https://gigachat.devices.sberbank.ru/api/v1 GIGACHAT_BASE_URL=https://gigachat.devices.sberbank.ru/api/v1

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@@ -65,3 +65,8 @@ from agents.chat_agent import ChatAgent
- Контекст разговоров хранится в Redis с TTL 24 часа - Контекст разговоров хранится в Redis с TTL 24 часа
- Промпты оптимизированы для детей с РАС (простой язык, короткие предложения) - Промпты оптимизированы для детей с РАС (простой язык, короткие предложения)
## Запреты
- Не пиши тесты
- README.md заполняй минимально необходимо для понимания
- промты для ии-агента не пиши, но явно укажи место, где нужно дописать промпт

52
Dockerfile Normal file
View File

@@ -0,0 +1,52 @@
# Многоступенчатая сборка для оптимизации размера образа
FROM python:3.11-slim as builder
# Установка системных зависимостей для сборки
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Создание виртуального окружения
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Копирование файлов зависимостей
COPY requirements.txt pyproject.toml ./
# Установка зависимостей
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# Финальный образ
FROM python:3.11-slim
# Установка только runtime зависимостей
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Копирование виртуального окружения из builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Создание рабочей директории
WORKDIR /app
# Копирование кода проекта
COPY agents/ ./agents/
COPY models/ ./models/
COPY services/ ./services/
COPY prompts/ ./prompts/
COPY scripts/ ./scripts/
COPY app.py ./
# Установка переменных окружения
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app
# Запуск FastAPI сервера
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

13
agents/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""ИИ-агенты для проекта Новая Планета."""
from agents.chat_agent import ChatAgent
from agents.gigachat_client import GigaChatClient
from agents.recommendation_engine import RecommendationEngine
from agents.schedule_generator import ScheduleGenerator
__all__ = [
"GigaChatClient",
"ScheduleGenerator",
"ChatAgent",
"RecommendationEngine",
]

119
agents/chat_agent.py Normal file
View File

@@ -0,0 +1,119 @@
"""ИИ-агент для чата 'Планета Земля'."""
from typing import List, Optional
from uuid import UUID
from models.gigachat_types import GigaChatMessage
from prompts.persona import EARTH_PERSONA
from agents.gigachat_client import GigaChatClient
from services.cache_service import CacheService
class ChatAgent:
"""ИИ-агент для общения с детьми и родителями."""
def __init__(self, gigachat: GigaChatClient, cache: CacheService):
self.gigachat = gigachat
self.cache = cache
async def chat(
self,
user_id: UUID,
message: str,
conversation_id: Optional[str] = None,
model: str = "GigaChat-2",
) -> tuple[str, int]:
"""
Отправить сообщение и получить ответ.
Args:
user_id: ID пользователя
message: Текст сообщения
conversation_id: ID разговора (для контекста)
model: Модель GigaChat
Returns:
(ответ, количество использованных токенов)
"""
# Загружаем контекст из кэша
context_messages = []
if conversation_id:
cached_context = await self.cache.get_context(str(conversation_id))
# Фильтруем системные сообщения из кэша - они не должны там храниться
context_messages = [
GigaChatMessage(role=msg["role"], content=msg["content"])
for msg in cached_context
if msg["role"] != "system"
]
# Системное сообщение ВСЕГДА должно быть первым
system_message = GigaChatMessage(role="system", content=EARTH_PERSONA)
# Убеждаемся, что системное сообщение первое (удаляем все системные сообщения и добавляем одно в начало)
context_messages = [msg for msg in context_messages if msg.role != "system"]
context_messages.insert(0, system_message)
# Добавляем текущее сообщение пользователя
context_messages.append(GigaChatMessage(role="user", content=message))
# Отправляем запрос (не передаем message отдельно, т.к. оно уже в context_messages)
response = await self.gigachat.chat_with_response(
message="", # Пустое, т.к. сообщение уже добавлено в context_messages
context=context_messages,
model=model,
temperature=0.7,
max_tokens=1500,
)
assistant_message = response.choices[0].message.content
tokens_used = response.usage.total_tokens
# Сохраняем в контекст
if conversation_id:
await self.cache.add_message(str(conversation_id), "user", message)
await self.cache.add_message(str(conversation_id), "assistant", assistant_message)
return assistant_message, tokens_used
async def chat_with_context(
self,
user_id: UUID,
message: str,
context: Optional[List[dict]] = None,
model: str = "GigaChat-2",
) -> tuple[str, int]:
"""
Отправить сообщение с явным контекстом.
Args:
user_id: ID пользователя
message: Текст сообщения
context: Явный контекст разговора
model: Модель GigaChat
Returns:
(ответ, количество использованных токенов)
"""
context_messages = [GigaChatMessage(role="system", content=EARTH_PERSONA)]
if context:
for msg in context:
context_messages.append(
GigaChatMessage(role=msg["role"], content=msg["content"])
)
context_messages.append(GigaChatMessage(role="user", content=message))
# Отправляем запрос (не передаем message отдельно, т.к. оно уже в context_messages)
response = await self.gigachat.chat_with_response(
message="", # Пустое, т.к. сообщение уже добавлено в context_messages
context=context_messages,
model=model,
temperature=0.7,
max_tokens=1500,
)
assistant_message = response.choices[0].message.content
tokens_used = response.usage.total_tokens
return assistant_message, tokens_used

139
agents/gigachat_client.py Normal file
View File

@@ -0,0 +1,139 @@
"""Клиент для работы с GigaChat API."""
import json
from typing import List, Optional
import aiohttp
from models.gigachat_types import GigaChatMessage, GigaChatRequest, GigaChatResponse
from services.token_manager import TokenManager
class GigaChatClient:
"""Клиент для взаимодействия с GigaChat API."""
def __init__(
self,
token_manager: TokenManager,
base_url: Optional[str] = None,
):
self.token_manager = token_manager
self.base_url = base_url or "https://gigachat.devices.sberbank.ru/api/v1"
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Получить HTTP сессию (lazy initialization)."""
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(ssl=False)
self._session = aiohttp.ClientSession(connector=connector)
return self._session
async def chat(
self,
message: str,
context: Optional[List[GigaChatMessage]] = None,
model: str = "GigaChat-2",
temperature: float = 0.7,
max_tokens: int = 2000,
) -> str:
"""
Отправить сообщение в GigaChat.
Args:
message: Текст сообщения
context: История сообщений
model: Модель GigaChat (GigaChat-2, GigaChat-2-Lite, GigaChat-2-Pro, GigaChat-2-Max)
temperature: Температура генерации
max_tokens: Максимальное количество токенов
Returns:
Ответ от модели
"""
messages = context or []
messages.append(GigaChatMessage(role="user", content=message))
request = GigaChatRequest(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
response = await self._make_request(request)
return response.choices[0].message.content
async def chat_with_response(
self,
message: str,
context: Optional[List[GigaChatMessage]] = None,
model: str = "GigaChat-2",
temperature: float = 0.7,
max_tokens: int = 2000,
) -> GigaChatResponse:
"""
Отправить сообщение и получить полный ответ.
Args:
message: Текст сообщения
context: История сообщений (уже должна содержать системное сообщение первым)
model: Модель GigaChat
temperature: Температура генерации
max_tokens: Максимальное количество токенов
Returns:
Полный ответ от API
"""
# Создаем копию списка, чтобы не изменять оригинал
messages = list(context) if context else []
# Убеждаемся, что системное сообщение первое
system_messages = [msg for msg in messages if msg.role == "system"]
non_system_messages = [msg for msg in messages if msg.role != "system"]
# Если есть системные сообщения, берем первое, иначе оставляем список пустым
if system_messages:
messages = [system_messages[0]] + non_system_messages
else:
messages = non_system_messages
# Добавляем текущее сообщение пользователя только если его еще нет в конце
if not messages or messages[-1].role != "user" or messages[-1].content != message:
messages.append(GigaChatMessage(role="user", content=message))
request = GigaChatRequest(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return await self._make_request(request)
async def _make_request(self, request: GigaChatRequest) -> GigaChatResponse:
"""Выполнить запрос к API."""
token = await self.token_manager.get_token()
session = await self._get_session()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
url = f"{self.base_url}/chat/completions"
async with session.post(
url,
headers=headers,
json=request.model_dump(exclude_none=True),
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"GigaChat API error: {response.status} - {error_text}")
data = await response.json()
return GigaChatResponse(**data)
async def close(self):
"""Закрыть HTTP сессию."""
if self._session and not self._session.closed:
await self._session.close()

View File

@@ -0,0 +1,130 @@
"""Рекомендательная система для заданий (MVP-1)."""
from typing import Dict, List, Optional
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class RecommendationEngine:
"""Простая рекомендательная система на основе TF-IDF."""
def __init__(self):
self.vectorizer = TfidfVectorizer(max_features=100, stop_words="english")
self.task_vectors = None
self.tasks = []
def fit(self, tasks: List[Dict]):
"""
Обучить модель на исторических данных.
Args:
tasks: Список заданий с полями: title, description, category, completed
"""
self.tasks = tasks
# Создаем текстовые описания для векторизации
texts = []
for task in tasks:
text = f"{task.get('title', '')} {task.get('description', '')} {task.get('category', '')}"
texts.append(text)
if texts:
self.task_vectors = self.vectorizer.fit_transform(texts)
def recommend(
self,
preferences: List[str],
completed_tasks: Optional[List[str]] = None,
top_k: int = 5,
) -> List[Dict]:
"""
Рекомендовать задания на основе предпочтений.
Args:
preferences: Предпочтения пользователя
completed_tasks: Список уже выполненных заданий (для исключения)
top_k: Количество рекомендаций
Returns:
Список рекомендованных заданий
"""
if not self.tasks or self.task_vectors is None:
return []
# Векторизуем предпочтения
preferences_text = " ".join(preferences)
preference_vector = self.vectorizer.transform([preferences_text])
# Вычисляем схожесть
similarities = cosine_similarity(preference_vector, self.task_vectors)[0]
# Исключаем уже выполненные задания
if completed_tasks:
for i, task in enumerate(self.tasks):
if task.get("title") in completed_tasks or task.get("id") in completed_tasks:
similarities[i] = -1
# Получаем топ-K индексов
top_indices = np.argsort(similarities)[::-1][:top_k]
top_indices = [idx for idx in top_indices if similarities[idx] > 0]
return [self.tasks[idx] for idx in top_indices]
def recommend_by_category(
self,
category: str,
completed_tasks: Optional[List[str]] = None,
top_k: int = 3,
) -> List[Dict]:
"""
Рекомендовать задания по категории.
Args:
category: Категория заданий
completed_tasks: Выполненные задания
top_k: Количество рекомендаций
Returns:
Список рекомендованных заданий
"""
category_tasks = [task for task in self.tasks if task.get("category") == category]
if completed_tasks:
category_tasks = [
task
for task in category_tasks
if task.get("title") not in completed_tasks
and task.get("id") not in completed_tasks
]
# Сортируем по популярности (можно добавить поле rating)
return category_tasks[:top_k]
def get_popular_tasks(self, top_k: int = 10) -> List[Dict]:
"""
Получить популярные задания.
Args:
top_k: Количество заданий
Returns:
Список популярных заданий
"""
# Простая эвристика: задания, которые чаще выполняются
task_scores: Dict[str, float] = {}
for task in self.tasks:
task_id = task.get("id") or task.get("title")
if task.get("completed", False):
task_scores[task_id] = task_scores.get(task_id, 0) + 1
# Сортируем по популярности
sorted_tasks = sorted(
self.tasks,
key=lambda t: task_scores.get(t.get("id") or t.get("title"), 0),
reverse=True,
)
return sorted_tasks[:top_k]

View File

@@ -0,0 +1,168 @@
"""Генератор расписаний с использованием GigaChat."""
import json
from typing import List, Optional
from models.gigachat_types import GigaChatMessage
from models.schedule import Schedule, Task
from prompts.schedule_prompts import SCHEDULE_GENERATION_PROMPT
from agents.gigachat_client import GigaChatClient
class ScheduleGenerator:
"""Генератор расписаний для детей с РАС."""
def __init__(self, gigachat: GigaChatClient):
self.gigachat = gigachat
async def generate(
self,
child_age: int,
preferences: List[str],
date: str,
existing_tasks: Optional[List[str]] = None,
model: str = "GigaChat-2-Pro",
) -> Schedule:
"""
Сгенерировать расписание.
Args:
child_age: Возраст ребенка
preferences: Предпочтения ребенка
date: Дата расписания
existing_tasks: Существующие задания для учета
model: Модель GigaChat
Returns:
Объект расписания
"""
preferences_str = ", ".join(preferences) if preferences else "не указаны"
prompt = SCHEDULE_GENERATION_PROMPT.format(
age=child_age,
preferences=preferences_str,
date=date,
)
if existing_tasks:
prompt += f"\n\nУчти существующие задания: {', '.join(existing_tasks)}"
# Используем более высокую температуру для разнообразия
response_text = await self.gigachat.chat(
message=prompt,
model=model,
temperature=0.8,
max_tokens=3000,
)
# Парсим JSON из ответа
schedule_data = self._parse_json_response(response_text)
# Создаем объект Schedule
tasks = [
Task(
title=task_data["title"],
description=task_data.get("description"),
duration_minutes=task_data["duration_minutes"],
category=task_data.get("category", "обучение"),
)
for task_data in schedule_data.get("tasks", [])
]
return Schedule(
title=schedule_data.get("title", f"Расписание на {date}"),
date=date,
tasks=tasks,
)
async def update(
self,
existing_schedule: Schedule,
user_request: str,
model: str = "GigaChat-2-Pro",
) -> Schedule:
"""
Обновить существующее расписание.
Args:
existing_schedule: Текущее расписание
user_request: Запрос на изменение
model: Модель GigaChat
Returns:
Обновленное расписание
"""
from prompts.schedule_prompts import SCHEDULE_UPDATE_PROMPT
schedule_json = existing_schedule.model_dump_json()
prompt = SCHEDULE_UPDATE_PROMPT.format(
existing_schedule=schedule_json,
user_request=user_request,
)
response_text = await self.gigachat.chat(
message=prompt,
model=model,
temperature=0.7,
max_tokens=3000,
)
schedule_data = self._parse_json_response(response_text)
tasks = [
Task(
title=task_data["title"],
description=task_data.get("description"),
duration_minutes=task_data["duration_minutes"],
category=task_data.get("category", "обучение"),
)
for task_data in schedule_data.get("tasks", [])
]
return Schedule(
id=existing_schedule.id,
title=schedule_data.get("title", existing_schedule.title),
date=existing_schedule.date,
tasks=tasks,
user_id=existing_schedule.user_id,
)
def _parse_json_response(self, response_text: str) -> dict:
"""
Извлечь JSON из ответа модели.
Args:
response_text: Текст ответа
Returns:
Распарсенный JSON
"""
# Пытаемся найти JSON в ответе
response_text = response_text.strip()
# Удаляем markdown код блоки если есть
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.startswith("```"):
response_text = response_text[3:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
try:
return json.loads(response_text)
except json.JSONDecodeError:
# Если не удалось распарсить, пытаемся найти JSON объект в тексте
start_idx = response_text.find("{")
end_idx = response_text.rfind("}") + 1
if start_idx >= 0 and end_idx > start_idx:
try:
return json.loads(response_text[start_idx:end_idx])
except json.JSONDecodeError:
pass
raise ValueError(f"Не удалось распарсить JSON из ответа: {response_text[:200]}")

143
app.py Normal file
View File

@@ -0,0 +1,143 @@
"""FastAPI сервер для AI-агентов."""
import os
from typing import List, Optional, Dict, Any
from uuid import UUID, uuid4
from fastapi import FastAPI, HTTPException, APIRouter
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from agents.gigachat_client import GigaChatClient
from agents.chat_agent import ChatAgent
from services.token_manager import TokenManager
from services.cache_service import CacheService
from models.gigachat_types import GigaChatMessage
app = FastAPI(title="New Planet AI Agents API", version="1.0.0")
# CORS middleware для работы с backend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # В production указать конкретные домены
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Роутер для API эндпоинтов
api_router = APIRouter(prefix="/api/v1", tags=["ai"])
# Инициализация сервисов
token_manager = TokenManager()
gigachat_client = GigaChatClient(token_manager)
cache_service = CacheService() # Использует REDIS_URL из окружения
chat_agent = ChatAgent(gigachat_client, cache_service)
# Модели запросов/ответов
class ChatRequest(BaseModel):
"""Запрос на отправку сообщения в чат."""
message: str = Field(..., min_length=1, max_length=2000)
conversation_id: Optional[str] = None
context: Optional[List[Dict[str, Any]]] = None
model: Optional[str] = "GigaChat-2"
user_id: Optional[UUID] = None
class ChatResponse(BaseModel):
"""Ответ от ИИ-агента."""
response: str
conversation_id: Optional[str] = None
tokens_used: Optional[int] = None
model: Optional[str] = None
class GenerateTextRequest(BaseModel):
"""Запрос на генерацию текста."""
prompt: str = Field(..., min_length=1)
model: Optional[str] = "GigaChat-2-Pro"
class GenerateTextResponse(BaseModel):
"""Ответ с сгенерированным текстом."""
text: str
@app.get("/health")
async def health_check():
"""Проверка здоровья сервиса."""
return {"status": "ok", "service": "ai-agents"}
@api_router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Отправить сообщение в чат через ChatAgent.
Поддерживает два режима:
1. С conversation_id - использует ChatAgent с контекстом из Redis
2. С явным context - использует chat_with_context
"""
try:
user_id = request.user_id or uuid4()
if request.context:
# Используем явный контекст
response_text, tokens_used = await chat_agent.chat_with_context(
user_id=user_id,
message=request.message,
context=request.context,
model=request.model or "GigaChat-2",
)
else:
# Используем ChatAgent с conversation_id
response_text, tokens_used = await chat_agent.chat(
user_id=user_id,
message=request.message,
conversation_id=request.conversation_id,
model=request.model or "GigaChat-2",
)
return ChatResponse(
response=response_text,
conversation_id=request.conversation_id,
tokens_used=tokens_used,
model=request.model or "GigaChat-2",
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Chat error: {str(e)}")
@api_router.post("/generate_text", response_model=GenerateTextResponse)
async def generate_text(request: GenerateTextRequest):
"""
Генерация текста по промпту через GigaChat.
"""
try:
response_text = await gigachat_client.chat(
message=request.prompt,
model=request.model or "GigaChat-2-Pro",
temperature=0.7,
max_tokens=2000,
)
return GenerateTextResponse(text=response_text)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Generate text error: {str(e)}")
# Подключение роутера к приложению
app.include_router(api_router)
@app.on_event("shutdown")
async def shutdown():
"""Закрытие соединений при остановке."""
await gigachat_client.close()
await cache_service.close()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

32
docker-compose.yml Normal file
View File

@@ -0,0 +1,32 @@
version: '3.8'
services:
ai-agents:
ports:
- "8001:8000"
build:
context: .
dockerfile: Dockerfile
container_name: new-planet-ai-agents
environment:
- REDIS_URL=redis://newplanet-redis:6379/0
- GIGACHAT_CLIENT_ID=${GIGACHAT_CLIENT_ID}
- GIGACHAT_CLIENT_SECRET=${GIGACHAT_CLIENT_SECRET}
- PYTHONUNBUFFERED=1
- PYTHONDONTWRITEBYTECODE=1
- PYTHONPATH=/app
volumes:
- ./agents:/app/agents
- ./models:/app/models
- ./services:/app/services
- ./prompts:/app/prompts
- ./scripts:/app/scripts
- ./app.py:/app/app.py
networks:
- new-planet-network
# Запуск FastAPI сервера
command: uvicorn app:app --host 0.0.0.0 --port 8000
networks:
new-planet-network:
external: true

39
models/__init__.py Normal file
View File

@@ -0,0 +1,39 @@
"""Модели данных для AI-агентов."""
from models.conversation import (
ChatRequest,
ChatResponse,
ConversationCreate,
ConversationResponse,
Message,
)
from models.gigachat_types import (
GigaChatChoice,
GigaChatMessage,
GigaChatRequest,
GigaChatResponse,
GigaChatTokenResponse,
GigaChatUsage,
)
from models.schedule import Schedule, ScheduleGenerateRequest, Task
from models.task import TaskCreate, TaskResponse, TaskUpdate
__all__ = [
"Schedule",
"ScheduleGenerateRequest",
"Task",
"TaskCreate",
"TaskUpdate",
"TaskResponse",
"ChatRequest",
"ChatResponse",
"ConversationCreate",
"ConversationResponse",
"Message",
"GigaChatMessage",
"GigaChatRequest",
"GigaChatResponse",
"GigaChatTokenResponse",
"GigaChatUsage",
"GigaChatChoice",
]

50
models/conversation.py Normal file
View File

@@ -0,0 +1,50 @@
"""Pydantic модели для диалогов с ИИ."""
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
class Message(BaseModel):
"""Модель сообщения в диалоге."""
role: str = Field(..., description="Роль: system, user, assistant")
content: str = Field(..., description="Текст сообщения")
timestamp: Optional[datetime] = None
class ConversationCreate(BaseModel):
"""Модель для создания диалога."""
user_id: UUID
title: Optional[str] = None
class ConversationResponse(BaseModel):
"""Модель ответа с диалогом."""
id: UUID
user_id: UUID
title: Optional[str]
messages: List[Message] = Field(default_factory=list)
created_at: datetime
updated_at: datetime
class ChatRequest(BaseModel):
"""Запрос на отправку сообщения в чат."""
message: str = Field(..., min_length=1, max_length=2000)
conversation_id: Optional[UUID] = None
user_id: UUID
class ChatResponse(BaseModel):
"""Ответ от ИИ-агента."""
response: str
conversation_id: UUID
tokens_used: Optional[int] = None
model: Optional[str] = None

58
models/gigachat_types.py Normal file
View File

@@ -0,0 +1,58 @@
"""Типы для работы с GigaChat API."""
from typing import List, Literal, Optional
from pydantic import BaseModel, Field
class GigaChatMessage(BaseModel):
"""Сообщение для GigaChat API."""
role: Literal["system", "user", "assistant"]
content: str
class GigaChatRequest(BaseModel):
"""Запрос к GigaChat API."""
model: str = Field(default="GigaChat-2", description="Модель GigaChat")
messages: List[GigaChatMessage] = Field(..., description="История сообщений")
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
max_tokens: int = Field(default=2000, ge=1, le=8192)
top_p: float = Field(default=0.9, ge=0.0, le=1.0)
stream: bool = Field(default=False)
class GigaChatChoice(BaseModel):
"""Вариант ответа от GigaChat."""
message: GigaChatMessage
index: int
finish_reason: Optional[str] = None
class GigaChatUsage(BaseModel):
"""Использование токенов."""
prompt_tokens: int
completion_tokens: int
total_tokens: int
class GigaChatResponse(BaseModel):
"""Ответ от GigaChat API."""
id: Optional[str] = None
object: Optional[str] = None
created: Optional[int] = None
model: str
choices: List[GigaChatChoice]
usage: GigaChatUsage
class GigaChatTokenResponse(BaseModel):
"""Ответ на запрос токена."""
access_token: str
expires_at: int
token_type: str = "Bearer"

42
models/schedule.py Normal file
View File

@@ -0,0 +1,42 @@
"""Pydantic модели для расписаний."""
from __future__ import annotations
from datetime import date as date_type
from typing import List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
class Task(BaseModel):
"""Модель задания в расписании."""
id: Optional[UUID] = None
title: str = Field(..., description="Название задания")
description: Optional[str] = Field(None, description="Подробное описание")
duration_minutes: int = Field(..., ge=1, description="Длительность в минутах")
category: str = Field(..., description="Категория задания")
image_url: Optional[str] = Field(None, description="URL изображения")
completed: bool = Field(default=False, description="Выполнено ли задание")
order: int = Field(default=0, description="Порядок в расписании")
class Schedule(BaseModel):
"""Модель расписания."""
id: Optional[UUID] = None
title: str = Field(..., description="Название расписания")
date: date_type = Field(..., description="Дата расписания")
tasks: List[Task] = Field(default_factory=list, description="Список заданий")
user_id: Optional[UUID] = None
created_at: Optional[str] = None
class ScheduleGenerateRequest(BaseModel):
"""Запрос на генерацию расписания."""
child_age: int = Field(..., ge=1, le=18, description="Возраст ребенка")
preferences: List[str] = Field(default_factory=list, description="Предпочтения ребенка")
date: date_type = Field(..., description="Дата расписания")
existing_tasks: Optional[List[str]] = Field(None, description="Существующие задания для учета")

45
models/task.py Normal file
View File

@@ -0,0 +1,45 @@
"""Pydantic модели для заданий."""
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, Field
class TaskCreate(BaseModel):
"""Модель для создания задания."""
title: str = Field(..., min_length=1, max_length=255)
description: Optional[str] = None
duration_minutes: int = Field(..., ge=1, le=480)
category: str = Field(..., description="Категория: утренняя_рутина, обучение, игра, отдых, вечерняя_рутина")
image_url: Optional[str] = None
order: int = Field(default=0, ge=0)
class TaskUpdate(BaseModel):
"""Модель для обновления задания."""
title: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = None
duration_minutes: Optional[int] = Field(None, ge=1, le=480)
category: Optional[str] = None
image_url: Optional[str] = None
completed: Optional[bool] = None
order: Optional[int] = Field(None, ge=0)
class TaskResponse(BaseModel):
"""Модель ответа с заданием."""
id: UUID
title: str
description: Optional[str]
duration_minutes: int
category: str
image_url: Optional[str]
completed: bool
order: int
schedule_id: UUID
created_at: datetime

13
prompts/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""Промпты для ИИ-агентов."""
from prompts.chat_prompts import CHAT_CONTEXT_PROMPT, CHAT_SYSTEM_PROMPT
from prompts.persona import EARTH_PERSONA
from prompts.schedule_prompts import SCHEDULE_GENERATION_PROMPT, SCHEDULE_UPDATE_PROMPT
__all__ = [
"EARTH_PERSONA",
"SCHEDULE_GENERATION_PROMPT",
"SCHEDULE_UPDATE_PROMPT",
"CHAT_SYSTEM_PROMPT",
"CHAT_CONTEXT_PROMPT",
]

27
prompts/chat_prompts.py Normal file
View File

@@ -0,0 +1,27 @@
"""Промпты для чата с ИИ-агентом."""
CHAT_SYSTEM_PROMPT = """Ты планета Земля - помощник для детей с РАС и их родителей.
Твоя задача:
- Отвечать на вопросы о расписании
- Помогать понять задания
- Мотивировать и поддерживать
- Объяснять простым языком
- Расписывать действия пошагово
Правила общения:
- Используй короткие предложения
- Будь терпеливым и добрым
- Используй эмодзи для эмоциональной поддержки 🌍✨
- Избегай сложных терминов
- Подтверждай понимание вопроса
"""
CHAT_CONTEXT_PROMPT = """Контекст разговора:
{context}
Текущий вопрос пользователя:
{message}
Ответь как планета Земля, учитывая контекст разговора."""

32
prompts/persona.py Normal file
View File

@@ -0,0 +1,32 @@
"""Персона ИИ-агента 'Планета Земля'."""
EARTH_PERSONA = """Ты планета Земля - анимированный персонаж и друг детей с расстройством аутистического спектра (РАС).
Твоя личность:
- Добрая, терпеливая, понимающая
- Говоришь простым языком
- Используешь эмодзи 🌍✨
- Поощряешь любые достижения
- Даешь четкие инструкции
Особенности общения:
- Короткие предложения
- Избегай сложных метафор
- Подтверждай понимание
- Задавай уточняющие вопросы
- Будь позитивным и поддерживающим
Твоя роль:
- Помогать детям с РАС понимать расписание
- Объяснять задания простыми словами
- Мотивировать на выполнение задач
- Отвечать на вопросы о распорядке дня
- Создавать расписания с учетом особенностей ребенка
Важно:
- Всегда будь терпеливым
- Не используй сложные слова
- Хвали за любые успехи
- Предлагай помощь, но не настаивай
"""

View File

@@ -0,0 +1,73 @@
"""Промпты для генерации расписаний."""
SCHEDULE_GENERATION_PROMPT = """Ты планета Земля, друг детей с расстройством аутистического спектра (РАС).
Создай расписание на {date} для ребенка {age} лет.
Предпочтения ребенка: {preferences}
Важные правила:
1. Задания должны быть простыми и понятными
2. Каждое задание имеет четкие временные рамки
3. Используй визуальные описания
4. Избегай резких переходов между активностями
5. Включи время на отдых между заданиями
6. Учитывай возраст ребенка при выборе длительности заданий
7. Добавь перерывы каждые 30-45 минут
Структура дня должна включать:
- Утреннюю рутину (пробуждение, гигиена, завтрак)
- Обучающие задания (соответствующие возрасту)
- Игровую деятельность
- Время на отдых и сенсорные перерывы
- Вечернюю рутину (ужин, подготовка ко сну)
Верни ТОЛЬКО валидный JSON без дополнительного текста:
{{
"title": "Название расписания",
"tasks": [
{{
"title": "Название задания",
"description": "Подробное описание задания простым языком",
"duration_minutes": 30,
"category": "утренняя_рутина"
}}
]
}}
Категории заданий: утренняя_рутина, обучение, игра, отдых, вечерняя_рутина
"""
SCHEDULE_UPDATE_PROMPT = """Ты планета Земля, друг детей с расстройством аутистического спектра (РАС).
Текущее расписание:
{existing_schedule}
Запрос пользователя: {user_request}
Обнови расписание согласно запросу. Сохрани структуру и логику расписания, но внеси необходимые изменения.
Важные правила при обновлении:
1. Сохраняй простоту и понятность заданий
2. Поддерживай четкие временные рамки
3. Избегай резких переходов между активностями
4. Включи время на отдых между заданиями
5. Учитывай особенности РАС
Верни ТОЛЬКО валидный JSON формат без дополнительного текста:
{{
"title": "Название расписания",
"description": "Краткое описание",
"tasks": [
{{
"title": "Название задания",
"description": "Подробное описание",
"duration_minutes": 30,
"category": "утренняя_рутина",
"order": 0
}}
]
}}
Категории заданий: утренняя_рутина, обучение, игра, отдых, вечерняя_рутина
"""

View File

@@ -14,6 +14,8 @@ numpy
aiohttp aiohttp
redis redis
Pillow Pillow
fastapi
uvicorn[standard]
# Dev # Dev
pytest pytest

123
scripts/analyze_usage.py Normal file
View File

@@ -0,0 +1,123 @@
"""Скрипт для анализа использования токенов GigaChat."""
import argparse
import json
from collections import defaultdict
from datetime import datetime
from pathlib import Path
def calculate_cost(tokens: int, model: str = "Lite") -> float:
"""Рассчитать стоимость токенов."""
rates = {
"Lite": 0.2 / 1000,
"Pro": 1.5 / 1000,
"Max": 1.95 / 1000,
}
rate = rates.get(model, rates["Lite"])
return tokens * rate
def analyze_usage(data_file: str, month: str = None):
"""Проанализировать использование токенов."""
if not Path(data_file).exists():
print(f"Файл {data_file} не найден. Создайте файл с данными использования.")
print("\nФормат данных (JSON):")
print(json.dumps(
{
"usage": [
{
"user_id": "user_123",
"date": "2025-12-15",
"tokens": 1500,
"model": "Lite",
}
]
},
indent=2,
))
return
with open(data_file, "r", encoding="utf-8") as f:
data = json.load(f)
usage_records = data.get("usage", [])
if month:
# Фильтруем по месяцу
usage_records = [
record
for record in usage_records
if record.get("date", "").startswith(month)
]
if not usage_records:
print("Нет данных для анализа")
return
# Статистика по моделям
model_stats = defaultdict(lambda: {"tokens": 0, "requests": 0})
user_stats = defaultdict(lambda: {"tokens": 0, "requests": 0})
total_tokens = 0
for record in usage_records:
tokens = record.get("tokens", 0)
model = record.get("model", "Lite")
user_id = record.get("user_id", "unknown")
model_stats[model]["tokens"] += tokens
model_stats[model]["requests"] += 1
user_stats[user_id]["tokens"] += tokens
user_stats[user_id]["requests"] += 1
total_tokens += tokens
# Выводим отчет
print("=" * 50)
print(f"GigaChat Usage Report")
if month:
print(f"Period: {month}")
print("=" * 50)
print(f"\nTotal tokens used: {total_tokens:,}")
print("\nBy Model:")
total_cost = 0
for model, stats in sorted(model_stats.items()):
cost = calculate_cost(stats["tokens"], model)
total_cost += cost
print(f" {model}:")
print(f" Tokens: {stats['tokens']:,}")
print(f" Requests: {stats['requests']}")
print(f" Cost: ₽{cost:,.2f}")
print(f"\nTotal cost: ₽{total_cost:,.2f}")
print("\nTop Users:")
top_users = sorted(user_stats.items(), key=lambda x: x[1]["tokens"], reverse=True)[:10]
for user_id, stats in top_users:
print(f" {user_id}: {stats['tokens']:,} tokens ({stats['requests']} requests)")
def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Анализ использования токенов GigaChat")
parser.add_argument(
"--file",
type=str,
default="usage_data.json",
help="Файл с данными использования",
)
parser.add_argument(
"--month",
type=str,
default=None,
help="Месяц для анализа (формат: YYYY-MM)",
)
args = parser.parse_args()
analyze_usage(args.file, args.month)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,95 @@
"""Скрипт для экспорта диалогов."""
import argparse
import asyncio
import json
from datetime import datetime
from pathlib import Path
from services.cache_service import CacheService
async def export_conversations(
redis_url: str,
output_file: str,
conversation_ids: list[str] = None,
):
"""Экспортировать диалоги из Redis."""
cache = CacheService(redis_url=redis_url)
try:
if conversation_ids:
# Экспортируем конкретные диалоги
conversations = {}
for conv_id in conversation_ids:
messages = await cache.get_context(conv_id, max_messages=1000)
if messages:
conversations[conv_id] = {
"id": conv_id,
"messages": messages,
"exported_at": datetime.now().isoformat(),
}
else:
# Экспортируем все диалоги (требует доступа к Redis keys)
print("Экспорт всех диалогов требует прямого доступа к Redis.")
print("Используйте --ids для экспорта конкретных диалогов.")
return
if not conversations:
print("Нет диалогов для экспорта")
return
# Сохраняем в файл
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(
{
"exported_at": datetime.now().isoformat(),
"conversations": conversations,
},
f,
ensure_ascii=False,
indent=2,
)
print(f"Экспортировано {len(conversations)} диалогов в {output_file}")
finally:
await cache.close()
async def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Экспорт диалогов из Redis")
parser.add_argument(
"--redis-url",
type=str,
default="redis://localhost:6379/0",
help="URL Redis",
)
parser.add_argument(
"--output",
type=str,
default="conversations_export.json",
help="Файл для экспорта",
)
parser.add_argument(
"--ids",
type=str,
nargs="+",
help="ID диалогов для экспорта",
)
args = parser.parse_args()
await export_conversations(
redis_url=args.redis_url,
output_file=args.output,
conversation_ids=args.ids,
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,115 @@
"""Скрипт для генерации тестовых данных."""
import argparse
import asyncio
import json
import random
from datetime import date, timedelta
from uuid import uuid4
from models.schedule import Schedule, Task
def generate_tasks(count: int = 5) -> list[Task]:
"""Генерировать тестовые задания."""
task_templates = [
{"title": "Утренняя зарядка", "category": "утренняя_рутина", "duration": 15},
{"title": "Чистка зубов", "category": "утренняя_рутина", "duration": 5},
{"title": "Завтрак", "category": "утренняя_рутина", "duration": 20},
{"title": "Рисование", "category": "обучение", "duration": 30},
{"title": "Чтение книги", "category": "обучение", "duration": 20},
{"title": "Игра с конструктором", "category": "игра", "duration": 45},
{"title": "Прогулка", "category": "игра", "duration": 60},
{"title": "Обед", "category": "отдых", "duration": 30},
{"title": "Тихий час", "category": "отдых", "duration": 60},
{"title": "Ужин", "category": "вечерняя_рутина", "duration": 30},
{"title": "Подготовка ко сну", "category": "вечерняя_рутина", "duration": 20},
]
selected = random.sample(task_templates, min(count, len(task_templates)))
tasks = []
for i, template in enumerate(selected):
tasks.append(
Task(
id=uuid4(),
title=template["title"],
description=f"Описание для {template['title']}",
duration_minutes=template["duration"],
category=template["category"],
completed=random.choice([True, False]),
order=i,
)
)
return tasks
def generate_schedules(user_id: str, count: int, start_date: date = None) -> list[Schedule]:
"""Генерировать тестовые расписания."""
if start_date is None:
start_date = date.today()
schedules = []
for i in range(count):
schedule_date = start_date + timedelta(days=i)
tasks = generate_tasks(random.randint(4, 8))
schedules.append(
Schedule(
id=uuid4(),
title=f"Расписание на {schedule_date.strftime('%d.%m.%Y')}",
date=schedule_date,
tasks=tasks,
user_id=user_id,
created_at=schedule_date.isoformat(),
)
)
return schedules
async def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Генерация тестовых данных")
parser.add_argument("--users", type=int, default=10, help="Количество пользователей")
parser.add_argument("--schedules", type=int, default=50, help="Количество расписаний")
parser.add_argument("--output", type=str, default="test_data.json", help="Файл для сохранения")
args = parser.parse_args()
print(f"Генерация тестовых данных:")
print(f" Пользователей: {args.users}")
print(f" Расписаний: {args.schedules}")
data = {
"users": [],
"schedules": [],
}
# Генерируем пользователей
for _ in range(args.users):
user_id = str(uuid4())
data["users"].append(
{
"id": user_id,
"email": f"user_{random.randint(1000, 9999)}@example.com",
}
)
# Генерируем расписания для пользователя
schedules_per_user = args.schedules // args.users
user_schedules = generate_schedules(user_id, schedules_per_user)
data["schedules"].extend([s.model_dump() for s in user_schedules])
# Сохраняем в файл
with open(args.output, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, default=str)
print(f"\nДанные сохранены в {args.output}")
print(f" Пользователей: {len(data['users'])}")
print(f" Расписаний: {len(data['schedules'])}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,73 @@
"""Скрипт для миграции промптов."""
import argparse
import json
from pathlib import Path
from prompts.chat_prompts import CHAT_SYSTEM_PROMPT
from prompts.persona import EARTH_PERSONA
from prompts.schedule_prompts import SCHEDULE_GENERATION_PROMPT, SCHEDULE_UPDATE_PROMPT
def export_prompts(output_file: str):
"""Экспортировать все промпты в JSON."""
prompts = {
"persona": {
"name": "Earth Persona",
"content": EARTH_PERSONA,
},
"schedule_generation": {
"name": "Schedule Generation Prompt",
"content": SCHEDULE_GENERATION_PROMPT,
},
"schedule_update": {
"name": "Schedule Update Prompt",
"content": SCHEDULE_UPDATE_PROMPT,
},
"chat_system": {
"name": "Chat System Prompt",
"content": CHAT_SYSTEM_PROMPT,
},
}
with open(output_file, "w", encoding="utf-8") as f:
json.dump(prompts, f, ensure_ascii=False, indent=2)
print(f"Промпты экспортированы в {output_file}")
def import_prompts(input_file: str):
"""Импортировать промпты из JSON (для будущего использования)."""
with open(input_file, "r", encoding="utf-8") as f:
prompts = json.load(f)
print(f"Импортировано {len(prompts)} промптов:")
for key, value in prompts.items():
print(f" - {value['name']}: {len(value['content'])} символов")
def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Миграция промптов")
parser.add_argument(
"action",
choices=["export", "import"],
help="Действие: export или import",
)
parser.add_argument(
"--file",
type=str,
default="prompts.json",
help="Файл для экспорта/импорта",
)
args = parser.parse_args()
if args.action == "export":
export_prompts(args.file)
elif args.action == "import":
import_prompts(args.file)
if __name__ == "__main__":
main()

13
services/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""Сервисы для AI-агентов."""
from services.cache_service import CacheService
from services.data_analyzer import DataAnalyzer
from services.image_processor import ImageProcessor
from services.token_manager import TokenManager
__all__ = [
"TokenManager",
"CacheService",
"ImageProcessor",
"DataAnalyzer",
]

101
services/cache_service.py Normal file
View File

@@ -0,0 +1,101 @@
"""Сервис кэширования для Redis."""
import json
from typing import Any, Dict, List, Optional
import redis.asyncio as redis
from dotenv import load_dotenv
load_dotenv()
class CacheService:
"""Сервис для работы с Redis кэшем."""
def __init__(self, redis_url: Optional[str] = None):
import os
self.redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0")
self._client: Optional[redis.Redis] = None
async def _get_client(self) -> redis.Redis:
"""Получить клиент Redis (lazy initialization)."""
if self._client is None:
self._client = await redis.from_url(self.redis_url, decode_responses=True)
return self._client
async def get_context(self, conversation_id: str, max_messages: int = 50) -> List[Dict[str, str]]:
"""
Получить контекст разговора из кэша.
Args:
conversation_id: ID разговора
max_messages: Максимальное количество сообщений
Returns:
Список сообщений в формате [{"role": "...", "content": "..."}]
"""
client = await self._get_client()
key = f"conversation:{conversation_id}"
data = await client.get(key)
if not data:
return []
messages = json.loads(data)
# Возвращаем последние N сообщений
return messages[-max_messages:] if len(messages) > max_messages else messages
async def save_context(self, conversation_id: str, messages: List[Dict[str, str]], ttl: int = 86400):
"""
Сохранить контекст разговора в кэш.
Args:
conversation_id: ID разговора
messages: Список сообщений
ttl: Время жизни в секундах (по умолчанию 24 часа)
"""
client = await self._get_client()
key = f"conversation:{conversation_id}"
# Ограничиваем количество сообщений для экономии памяти
max_messages = 100
if len(messages) > max_messages:
messages = messages[-max_messages:]
await client.setex(key, ttl, json.dumps(messages, ensure_ascii=False))
async def add_message(self, conversation_id: str, role: str, content: str):
"""
Добавить сообщение в контекст разговора.
Args:
conversation_id: ID разговора
role: Роль (user, assistant, system)
content: Содержимое сообщения
"""
messages = await self.get_context(conversation_id, max_messages=1000)
messages.append({"role": role, "content": content})
await self.save_context(conversation_id, messages)
async def clear_context(self, conversation_id: str):
"""Очистить контекст разговора."""
client = await self._get_client()
key = f"conversation:{conversation_id}"
await client.delete(key)
async def get(self, key: str) -> Optional[Any]:
"""Получить значение по ключу."""
client = await self._get_client()
data = await client.get(key)
return json.loads(data) if data else None
async def set(self, key: str, value: Any, ttl: int = 3600):
"""Установить значение с TTL."""
client = await self._get_client()
await client.setex(key, ttl, json.dumps(value, ensure_ascii=False))
async def close(self):
"""Закрыть соединение с Redis."""
if self._client:
await self._client.close()
self._client = None

156
services/data_analyzer.py Normal file
View File

@@ -0,0 +1,156 @@
"""Сервис анализа данных детей."""
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import pandas as pd
class DataAnalyzer:
"""Сервис для анализа прогресса детей."""
@staticmethod
def calculate_completion_rate(tasks: List[Dict]) -> float:
"""
Рассчитать процент выполнения заданий.
Args:
tasks: Список заданий с полем 'completed'
Returns:
Процент выполнения (0.0 - 1.0)
"""
if not tasks:
return 0.0
completed = sum(1 for task in tasks if task.get("completed", False))
return completed / len(tasks)
@staticmethod
def analyze_daily_progress(schedules: List[Dict]) -> Dict:
"""
Проанализировать ежедневный прогресс.
Args:
schedules: Список расписаний с заданиями
Returns:
Словарь с аналитикой
"""
if not schedules:
return {
"total_days": 0,
"average_completion": 0.0,
"total_tasks": 0,
"completed_tasks": 0,
}
total_tasks = 0
completed_tasks = 0
completion_rates = []
for schedule in schedules:
tasks = schedule.get("tasks", [])
total_tasks += len(tasks)
completed_tasks += sum(1 for task in tasks if task.get("completed", False))
rate = DataAnalyzer.calculate_completion_rate(tasks)
completion_rates.append(rate)
return {
"total_days": len(schedules),
"average_completion": sum(completion_rates) / len(completion_rates) if completion_rates else 0.0,
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"completion_rate": completed_tasks / total_tasks if total_tasks > 0 else 0.0,
}
@staticmethod
def get_category_statistics(schedules: List[Dict]) -> Dict[str, Dict]:
"""
Получить статистику по категориям заданий.
Args:
schedules: Список расписаний
Returns:
Словарь со статистикой по категориям
"""
category_stats: Dict[str, Dict] = {}
for schedule in schedules:
for task in schedule.get("tasks", []):
category = task.get("category", "unknown")
if category not in category_stats:
category_stats[category] = {
"total": 0,
"completed": 0,
"average_duration": 0.0,
"durations": [],
}
stats = category_stats[category]
stats["total"] += 1
if task.get("completed", False):
stats["completed"] += 1
if "duration_minutes" in task:
stats["durations"].append(task["duration_minutes"])
# Вычисляем среднюю длительность
for category, stats in category_stats.items():
if stats["durations"]:
stats["average_duration"] = sum(stats["durations"]) / len(stats["durations"])
del stats["durations"]
return category_stats
@staticmethod
def get_weekly_trend(schedules: List[Dict], days: int = 7) -> List[Dict]:
"""
Получить тренд за последние N дней.
Args:
schedules: Список расписаний
days: Количество дней
Returns:
Список словарей с данными по дням
"""
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days - 1)
# Группируем расписания по датам
daily_data: Dict[str, List[Dict]] = {}
for schedule in schedules:
schedule_date = schedule.get("date")
if isinstance(schedule_date, str):
schedule_date = datetime.fromisoformat(schedule_date).date()
elif isinstance(schedule_date, datetime):
schedule_date = schedule_date.date()
if start_date <= schedule_date <= end_date:
date_str = str(schedule_date)
if date_str not in daily_data:
daily_data[date_str] = []
daily_data[date_str].append(schedule)
# Формируем тренд
trend = []
current_date = start_date
while current_date <= end_date:
date_str = str(current_date)
day_schedules = daily_data.get(date_str, [])
all_tasks = []
for sched in day_schedules:
all_tasks.extend(sched.get("tasks", []))
trend.append(
{
"date": date_str,
"completion_rate": DataAnalyzer.calculate_completion_rate(all_tasks),
"total_tasks": len(all_tasks),
"completed_tasks": sum(1 for task in all_tasks if task.get("completed", False)),
}
)
current_date += timedelta(days=1)
return trend

100
services/image_processor.py Normal file
View File

@@ -0,0 +1,100 @@
"""Сервис обработки изображений."""
import io
from pathlib import Path
from typing import Optional, Tuple
from PIL import Image
class ImageProcessor:
"""Сервис для обработки изображений заданий."""
MAX_SIZE = (800, 800)
SUPPORTED_FORMATS = {"JPEG", "PNG", "WEBP"}
QUALITY = 85
@staticmethod
def resize_image(
image_data: bytes, max_size: Tuple[int, int] = MAX_SIZE, quality: int = QUALITY
) -> bytes:
"""
Изменить размер изображения.
Args:
image_data: Байты изображения
max_size: Максимальный размер (width, height)
quality: Качество JPEG (1-100)
Returns:
Байты обработанного изображения
"""
image = Image.open(io.BytesIO(image_data))
image_format = image.format or "JPEG"
# Конвертируем в RGB если нужно
if image_format == "PNG" and image.mode in ("RGBA", "LA"):
background = Image.new("RGB", image.size, (255, 255, 255))
if image.mode == "RGBA":
background.paste(image, mask=image.split()[3])
else:
background.paste(image)
image = background
elif image.mode != "RGB":
image = image.convert("RGB")
# Изменяем размер с сохранением пропорций
image.thumbnail(max_size, Image.Resampling.LANCZOS)
# Сохраняем в байты
output = io.BytesIO()
image.save(output, format="JPEG", quality=quality, optimize=True)
return output.getvalue()
@staticmethod
def validate_image(image_data: bytes) -> Tuple[bool, Optional[str]]:
"""
Валидировать изображение.
Args:
image_data: Байты изображения
Returns:
(is_valid, error_message)
"""
try:
image = Image.open(io.BytesIO(image_data))
image_format = image.format
if image_format not in ImageProcessor.SUPPORTED_FORMATS:
return False, f"Неподдерживаемый формат: {image_format}"
# Проверяем размер
width, height = image.size
if width > 2000 or height > 2000:
return False, "Изображение слишком большое (максимум 2000x2000)"
# Проверяем файл на валидность
image.verify()
return True, None
except Exception as e:
return False, f"Ошибка валидации: {str(e)}"
@staticmethod
def get_image_info(image_data: bytes) -> dict:
"""
Получить информацию об изображении.
Args:
image_data: Байты изображения
Returns:
Словарь с информацией (format, size, mode)
"""
image = Image.open(io.BytesIO(image_data))
return {
"format": image.format,
"size": image.size,
"mode": image.mode,
}

153
services/token_manager.py Normal file
View File

@@ -0,0 +1,153 @@
"""Управление токенами GigaChat."""
import base64
import os
import time
import uuid
from typing import Optional
from urllib.parse import urlencode
import aiohttp
from aiohttp import FormData
from dotenv import load_dotenv
load_dotenv()
class TokenManager:
"""Менеджер токенов для GigaChat API."""
def __init__(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
auth_url: Optional[str] = None,
credentials: Optional[str] = None,
):
# Приоритет: переданные параметры > переменные окружения > .env файл
self.credentials = credentials or os.environ.get("GIGACHAT_CREDENTIALS") or os.getenv("GIGACHAT_CREDENTIALS")
self.client_id = client_id or os.environ.get("GIGACHAT_CLIENT_ID") or os.getenv("GIGACHAT_CLIENT_ID")
self.client_secret = client_secret or os.environ.get("GIGACHAT_CLIENT_SECRET") or os.getenv("GIGACHAT_CLIENT_SECRET")
self.auth_url = auth_url or os.environ.get("GIGACHAT_AUTH_URL") or os.getenv(
"GIGACHAT_AUTH_URL", "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
)
self._access_token: Optional[str] = None
self._expires_at: float = 0
async def get_token(self, force_refresh: bool = False) -> str:
"""
Получить актуальный токен доступа.
Args:
force_refresh: Принудительно обновить токен
Returns:
Токен доступа
"""
if not force_refresh and self._access_token and time.time() < self._expires_at:
return self._access_token
# Определяем, какой вариант используется: готовый ключ или client_id/client_secret
if self.credentials:
# Используем готовый ключ авторизации (уже закодированный в Base64)
# Убираем префикс "Basic " если он есть
credentials_key = self.credentials.strip().replace('\n', '').replace('\r', '')
if credentials_key.startswith('Basic '):
credentials_key = credentials_key[6:]
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
headers = {
"Authorization": f"Basic {credentials_key}",
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"RqUID": str(uuid.uuid4())
}
form_data = {
"scope": "GIGACHAT_API_PERS"
}
async with session.post(
self.auth_url,
headers=headers,
data=form_data,
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"Failed to get token: {response.status} - {error_text}")
data = await response.json()
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 1800)
self._expires_at = time.time() + expires_in - 300
return self._access_token
elif self.client_id and self.client_secret:
# Очищаем от пробелов и переносов
client_secret = self.client_secret.strip().replace('\n', '').replace('\r', '')
# Проверяем, является ли client_secret уже закодированным ключом Base64
# Если secret начинается с букв/цифр и длиннее 50 символов, это уже ключ авторизации
is_already_encoded = len(client_secret) > 50 and all(c.isalnum() or c in '+/=' for c in client_secret)
if is_already_encoded:
# Это уже готовый ключ авторизации в Base64
encoded_credentials = client_secret
print(f"DEBUG: Using pre-encoded authorization key (length: {len(encoded_credentials)})")
else:
# Это настоящий client_id и client_secret, нужно закодировать
client_id = self.client_id.strip().replace('\n', '').replace('\r', '')
if not client_id or not client_secret:
raise Exception("GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET cannot be empty after cleaning")
credentials_string = f"{client_id}:{client_secret}"
encoded_credentials = base64.b64encode(credentials_string.encode('utf-8')).decode('utf-8')
print(f"DEBUG: Encoded client_id:client_secret (length: {len(encoded_credentials)})")
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"RqUID": str(uuid.uuid4()),
"Authorization": f"Basic {encoded_credentials}"
}
payload = {"scope": "GIGACHAT_API_PERS"}
print(f"DEBUG: Authorization header starts with: Basic {encoded_credentials[:10]}...")
async with session.post(
self.auth_url,
headers=headers,
data=payload,
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(
f"Failed to get token: {response.status} - {error_text}. "
f"URL: {self.auth_url}"
)
data = await response.json()
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 1800)
self._expires_at = time.time() + expires_in - 300
return self._access_token
else:
raise Exception(
"Either GIGACHAT_CREDENTIALS (ready authorization key) or "
"GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET must be set"
)
def is_token_valid(self) -> bool:
"""Проверить, действителен ли текущий токен."""
return self._access_token is not None and time.time() < self._expires_at
def clear_token(self):
"""Очистить токен (для тестирования)."""
self._access_token = None
self._expires_at = 0

2
tests/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Тесты для AI-агентов."""

62
tests/test_chat_agent.py Normal file
View File

@@ -0,0 +1,62 @@
"""Тесты для чат-агента."""
import pytest
from unittest.mock import AsyncMock
from uuid import uuid4
from agents.chat_agent import ChatAgent
from agents.gigachat_client import GigaChatClient
from models.gigachat_types import GigaChatMessage, GigaChatResponse, GigaChatUsage, GigaChatChoice
from services.cache_service import CacheService
@pytest.fixture
def mock_gigachat():
"""Фикстура для мокового GigaChat клиента."""
return AsyncMock(spec=GigaChatClient)
@pytest.fixture
def mock_cache():
"""Фикстура для мокового CacheService."""
return AsyncMock(spec=CacheService)
@pytest.fixture
def chat_agent(mock_gigachat, mock_cache):
"""Фикстура для ChatAgent."""
return ChatAgent(gigachat=mock_gigachat, cache=mock_cache)
@pytest.mark.asyncio
async def test_chat_basic(chat_agent, mock_gigachat, mock_cache):
"""Тест базового чата."""
user_id = uuid4()
message = "Привет!"
mock_response = GigaChatResponse(
id="test_id",
object="chat.completion",
created=1234567890,
model="GigaChat-2-Lite",
choices=[
GigaChatChoice(
message=GigaChatMessage(role="assistant", content="Привет! Как дела? 🌍"),
index=0,
)
],
usage=GigaChatUsage(prompt_tokens=50, completion_tokens=10, total_tokens=60),
)
mock_gigachat.chat_with_response.return_value = mock_response
mock_cache.get_context.return_value = []
response, tokens = await chat_agent.chat(
user_id=user_id,
message=message,
conversation_id="test_conv",
)
assert response == "Привет! Как дела? 🌍"
assert tokens == 60
mock_cache.add_message.assert_called()

View File

@@ -0,0 +1,92 @@
"""Тесты для GigaChat клиента."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from agents.gigachat_client import GigaChatClient
from models.gigachat_types import GigaChatMessage, GigaChatResponse, GigaChatUsage, GigaChatChoice
from services.token_manager import TokenManager
@pytest.fixture
def token_manager():
"""Фикстура для TokenManager."""
manager = TokenManager(
client_id="test_id",
client_secret="test_secret",
)
manager._access_token = "test_token"
manager._expires_at = 9999999999
return manager
@pytest.fixture
def gigachat_client(token_manager):
"""Фикстура для GigaChatClient."""
return GigaChatClient(token_manager=token_manager)
@pytest.mark.asyncio
async def test_chat_success(gigachat_client):
"""Тест успешного запроса к GigaChat."""
mock_response = GigaChatResponse(
id="test_id",
object="chat.completion",
created=1234567890,
model="GigaChat-2",
choices=[
GigaChatChoice(
message=GigaChatMessage(role="assistant", content="Тестовый ответ"),
index=0,
finish_reason="stop",
)
],
usage=GigaChatUsage(
prompt_tokens=10,
completion_tokens=5,
total_tokens=15,
),
)
with patch("aiohttp.ClientSession.post") as mock_post:
mock_response_obj = AsyncMock()
mock_response_obj.status = 200
mock_response_obj.json = AsyncMock(return_value=mock_response.model_dump())
mock_post.return_value.__aenter__.return_value = mock_response_obj
response = await gigachat_client.chat("Привет!")
assert response == "Тестовый ответ"
@pytest.mark.asyncio
async def test_chat_with_context(gigachat_client):
"""Тест запроса с контекстом."""
context = [
GigaChatMessage(role="system", content="Ты помощник"),
GigaChatMessage(role="user", content="Привет"),
]
mock_response = GigaChatResponse(
id="test_id",
object="chat.completion",
created=1234567890,
model="GigaChat-2",
choices=[
GigaChatChoice(
message=GigaChatMessage(role="assistant", content="Ответ с контекстом"),
index=0,
)
],
usage=GigaChatUsage(prompt_tokens=20, completion_tokens=10, total_tokens=30),
)
with patch("aiohttp.ClientSession.post") as mock_post:
mock_response_obj = AsyncMock()
mock_response_obj.status = 200
mock_response_obj.json = AsyncMock(return_value=mock_response.model_dump())
mock_post.return_value.__aenter__.return_value = mock_response_obj
response = await gigachat_client.chat("Как дела?", context=context)
assert response == "Ответ с контекстом"

View File

@@ -0,0 +1,89 @@
"""Тесты для генератора расписаний."""
import pytest
from unittest.mock import AsyncMock
from agents.gigachat_client import GigaChatClient
from agents.schedule_generator import ScheduleGenerator
from models.gigachat_types import GigaChatMessage, GigaChatResponse, GigaChatUsage, GigaChatChoice
from services.token_manager import TokenManager
@pytest.fixture
def mock_gigachat():
"""Фикстура для мокового GigaChat клиента."""
client = AsyncMock(spec=GigaChatClient)
return client
@pytest.fixture
def schedule_generator(mock_gigachat):
"""Фикстура для ScheduleGenerator."""
return ScheduleGenerator(gigachat=mock_gigachat)
@pytest.mark.asyncio
async def test_generate_schedule(schedule_generator, mock_gigachat):
"""Тест генерации расписания."""
mock_response_json = """
{
"title": "Расписание на 2025-12-16",
"tasks": [
{
"title": "Утренняя зарядка",
"description": "Сделай зарядку",
"duration_minutes": 15,
"category": "утренняя_рутина"
},
{
"title": "Завтрак",
"description": "Позавтракай",
"duration_minutes": 20,
"category": "утренняя_рутина"
}
]
}
"""
mock_gigachat.chat.return_value = mock_response_json
schedule = await schedule_generator.generate(
child_age=7,
preferences=["рисование", "прогулка"],
date="2025-12-16",
)
assert schedule.title == "Расписание на 2025-12-16"
assert len(schedule.tasks) == 2
assert schedule.tasks[0].title == "Утренняя зарядка"
assert schedule.tasks[0].duration_minutes == 15
@pytest.mark.asyncio
async def test_generate_schedule_with_markdown(schedule_generator, mock_gigachat):
"""Тест генерации с markdown в ответе."""
mock_response_json = """
```json
{
"title": "Тестовое расписание",
"tasks": [
{
"title": "Тест",
"duration_minutes": 10,
"category": "обучение"
}
]
}
```
"""
mock_gigachat.chat.return_value = mock_response_json
schedule = await schedule_generator.generate(
child_age=5,
preferences=[],
date="2025-12-17",
)
assert schedule.title == "Тестовое расписание"
assert len(schedule.tasks) == 1