Added code samples for AI-Agents

This commit is contained in:
2025-12-17 20:22:46 +03:00
parent d66aed35d6
commit 0885618b25
29 changed files with 2007 additions and 0 deletions

123
scripts/analyze_usage.py Normal file
View File

@@ -0,0 +1,123 @@
"""Скрипт для анализа использования токенов GigaChat."""
import argparse
import json
from collections import defaultdict
from datetime import datetime
from pathlib import Path
def calculate_cost(tokens: int, model: str = "Lite") -> float:
"""Рассчитать стоимость токенов."""
rates = {
"Lite": 0.2 / 1000,
"Pro": 1.5 / 1000,
"Max": 1.95 / 1000,
}
rate = rates.get(model, rates["Lite"])
return tokens * rate
def analyze_usage(data_file: str, month: str = None):
"""Проанализировать использование токенов."""
if not Path(data_file).exists():
print(f"Файл {data_file} не найден. Создайте файл с данными использования.")
print("\nФормат данных (JSON):")
print(json.dumps(
{
"usage": [
{
"user_id": "user_123",
"date": "2025-12-15",
"tokens": 1500,
"model": "Lite",
}
]
},
indent=2,
))
return
with open(data_file, "r", encoding="utf-8") as f:
data = json.load(f)
usage_records = data.get("usage", [])
if month:
# Фильтруем по месяцу
usage_records = [
record
for record in usage_records
if record.get("date", "").startswith(month)
]
if not usage_records:
print("Нет данных для анализа")
return
# Статистика по моделям
model_stats = defaultdict(lambda: {"tokens": 0, "requests": 0})
user_stats = defaultdict(lambda: {"tokens": 0, "requests": 0})
total_tokens = 0
for record in usage_records:
tokens = record.get("tokens", 0)
model = record.get("model", "Lite")
user_id = record.get("user_id", "unknown")
model_stats[model]["tokens"] += tokens
model_stats[model]["requests"] += 1
user_stats[user_id]["tokens"] += tokens
user_stats[user_id]["requests"] += 1
total_tokens += tokens
# Выводим отчет
print("=" * 50)
print(f"GigaChat Usage Report")
if month:
print(f"Period: {month}")
print("=" * 50)
print(f"\nTotal tokens used: {total_tokens:,}")
print("\nBy Model:")
total_cost = 0
for model, stats in sorted(model_stats.items()):
cost = calculate_cost(stats["tokens"], model)
total_cost += cost
print(f" {model}:")
print(f" Tokens: {stats['tokens']:,}")
print(f" Requests: {stats['requests']}")
print(f" Cost: ₽{cost:,.2f}")
print(f"\nTotal cost: ₽{total_cost:,.2f}")
print("\nTop Users:")
top_users = sorted(user_stats.items(), key=lambda x: x[1]["tokens"], reverse=True)[:10]
for user_id, stats in top_users:
print(f" {user_id}: {stats['tokens']:,} tokens ({stats['requests']} requests)")
def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Анализ использования токенов GigaChat")
parser.add_argument(
"--file",
type=str,
default="usage_data.json",
help="Файл с данными использования",
)
parser.add_argument(
"--month",
type=str,
default=None,
help="Месяц для анализа (формат: YYYY-MM)",
)
args = parser.parse_args()
analyze_usage(args.file, args.month)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,95 @@
"""Скрипт для экспорта диалогов."""
import argparse
import asyncio
import json
from datetime import datetime
from pathlib import Path
from services.cache_service import CacheService
async def export_conversations(
redis_url: str,
output_file: str,
conversation_ids: list[str] = None,
):
"""Экспортировать диалоги из Redis."""
cache = CacheService(redis_url=redis_url)
try:
if conversation_ids:
# Экспортируем конкретные диалоги
conversations = {}
for conv_id in conversation_ids:
messages = await cache.get_context(conv_id, max_messages=1000)
if messages:
conversations[conv_id] = {
"id": conv_id,
"messages": messages,
"exported_at": datetime.now().isoformat(),
}
else:
# Экспортируем все диалоги (требует доступа к Redis keys)
print("Экспорт всех диалогов требует прямого доступа к Redis.")
print("Используйте --ids для экспорта конкретных диалогов.")
return
if not conversations:
print("Нет диалогов для экспорта")
return
# Сохраняем в файл
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(
{
"exported_at": datetime.now().isoformat(),
"conversations": conversations,
},
f,
ensure_ascii=False,
indent=2,
)
print(f"Экспортировано {len(conversations)} диалогов в {output_file}")
finally:
await cache.close()
async def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Экспорт диалогов из Redis")
parser.add_argument(
"--redis-url",
type=str,
default="redis://localhost:6379/0",
help="URL Redis",
)
parser.add_argument(
"--output",
type=str,
default="conversations_export.json",
help="Файл для экспорта",
)
parser.add_argument(
"--ids",
type=str,
nargs="+",
help="ID диалогов для экспорта",
)
args = parser.parse_args()
await export_conversations(
redis_url=args.redis_url,
output_file=args.output,
conversation_ids=args.ids,
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,115 @@
"""Скрипт для генерации тестовых данных."""
import argparse
import asyncio
import json
import random
from datetime import date, timedelta
from uuid import uuid4
from models.schedule import Schedule, Task
def generate_tasks(count: int = 5) -> list[Task]:
"""Генерировать тестовые задания."""
task_templates = [
{"title": "Утренняя зарядка", "category": "утренняя_рутина", "duration": 15},
{"title": "Чистка зубов", "category": "утренняя_рутина", "duration": 5},
{"title": "Завтрак", "category": "утренняя_рутина", "duration": 20},
{"title": "Рисование", "category": "обучение", "duration": 30},
{"title": "Чтение книги", "category": "обучение", "duration": 20},
{"title": "Игра с конструктором", "category": "игра", "duration": 45},
{"title": "Прогулка", "category": "игра", "duration": 60},
{"title": "Обед", "category": "отдых", "duration": 30},
{"title": "Тихий час", "category": "отдых", "duration": 60},
{"title": "Ужин", "category": "вечерняя_рутина", "duration": 30},
{"title": "Подготовка ко сну", "category": "вечерняя_рутина", "duration": 20},
]
selected = random.sample(task_templates, min(count, len(task_templates)))
tasks = []
for i, template in enumerate(selected):
tasks.append(
Task(
id=uuid4(),
title=template["title"],
description=f"Описание для {template['title']}",
duration_minutes=template["duration"],
category=template["category"],
completed=random.choice([True, False]),
order=i,
)
)
return tasks
def generate_schedules(user_id: str, count: int, start_date: date = None) -> list[Schedule]:
"""Генерировать тестовые расписания."""
if start_date is None:
start_date = date.today()
schedules = []
for i in range(count):
schedule_date = start_date + timedelta(days=i)
tasks = generate_tasks(random.randint(4, 8))
schedules.append(
Schedule(
id=uuid4(),
title=f"Расписание на {schedule_date.strftime('%d.%m.%Y')}",
date=schedule_date,
tasks=tasks,
user_id=user_id,
created_at=schedule_date.isoformat(),
)
)
return schedules
async def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Генерация тестовых данных")
parser.add_argument("--users", type=int, default=10, help="Количество пользователей")
parser.add_argument("--schedules", type=int, default=50, help="Количество расписаний")
parser.add_argument("--output", type=str, default="test_data.json", help="Файл для сохранения")
args = parser.parse_args()
print(f"Генерация тестовых данных:")
print(f" Пользователей: {args.users}")
print(f" Расписаний: {args.schedules}")
data = {
"users": [],
"schedules": [],
}
# Генерируем пользователей
for _ in range(args.users):
user_id = str(uuid4())
data["users"].append(
{
"id": user_id,
"email": f"user_{random.randint(1000, 9999)}@example.com",
}
)
# Генерируем расписания для пользователя
schedules_per_user = args.schedules // args.users
user_schedules = generate_schedules(user_id, schedules_per_user)
data["schedules"].extend([s.model_dump() for s in user_schedules])
# Сохраняем в файл
with open(args.output, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, default=str)
print(f"\nДанные сохранены в {args.output}")
print(f" Пользователей: {len(data['users'])}")
print(f" Расписаний: {len(data['schedules'])}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,73 @@
"""Скрипт для миграции промптов."""
import argparse
import json
from pathlib import Path
from prompts.chat_prompts import CHAT_SYSTEM_PROMPT
from prompts.persona import EARTH_PERSONA
from prompts.schedule_prompts import SCHEDULE_GENERATION_PROMPT, SCHEDULE_UPDATE_PROMPT
def export_prompts(output_file: str):
"""Экспортировать все промпты в JSON."""
prompts = {
"persona": {
"name": "Earth Persona",
"content": EARTH_PERSONA,
},
"schedule_generation": {
"name": "Schedule Generation Prompt",
"content": SCHEDULE_GENERATION_PROMPT,
},
"schedule_update": {
"name": "Schedule Update Prompt",
"content": SCHEDULE_UPDATE_PROMPT,
},
"chat_system": {
"name": "Chat System Prompt",
"content": CHAT_SYSTEM_PROMPT,
},
}
with open(output_file, "w", encoding="utf-8") as f:
json.dump(prompts, f, ensure_ascii=False, indent=2)
print(f"Промпты экспортированы в {output_file}")
def import_prompts(input_file: str):
"""Импортировать промпты из JSON (для будущего использования)."""
with open(input_file, "r", encoding="utf-8") as f:
prompts = json.load(f)
print(f"Импортировано {len(prompts)} промптов:")
for key, value in prompts.items():
print(f" - {value['name']}: {len(value['content'])} символов")
def main():
"""Главная функция."""
parser = argparse.ArgumentParser(description="Миграция промптов")
parser.add_argument(
"action",
choices=["export", "import"],
help="Действие: export или import",
)
parser.add_argument(
"--file",
type=str,
default="prompts.json",
help="Файл для экспорта/импорта",
)
args = parser.parse_args()
if args.action == "export":
export_prompts(args.file)
elif args.action == "import":
import_prompts(args.file)
if __name__ == "__main__":
main()