This commit is contained in:
Ilnar
2026-03-05 06:55:42 +03:00
commit 36a7d530c1
50 changed files with 3091 additions and 0 deletions

6
agents_service/.env Normal file
View File

@@ -0,0 +1,6 @@
GIGACHAT_CLIENT_ID=019cb2c4-b0cc-782b-af27-07bf919ce7c4
GIGACHAT_CLIENT_SECRET=58757e78-eaa4-4f67-b06a-2d02b655fd5d
GIGACHAT_SCOPE=GIGACHAT_API_PERS
GIGACHAT_MODEL=GigaChat
GIGACHAT_VERIFY_SSL_CERTS=false
PORT=8001

View File

@@ -0,0 +1,6 @@
GIGACHAT_CLIENT_ID=your_client_id_here
GIGACHAT_CLIENT_SECRET=your_client_secret_here
GIGACHAT_SCOPE=GIGACHAT_API_PERS
GIGACHAT_MODEL=GigaChat
GIGACHAT_VERIFY_SSL_CERTS=false
PORT=8001

13
agents_service/Dockerfile Normal file
View File

@@ -0,0 +1,13 @@
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src ./src
COPY .env.example ./.env.example
EXPOSE 8001
CMD ["python","-m","uvicorn","src.main:app","--host","0.0.0.0","--port","8001"]

14
agents_service/README.md Normal file
View File

@@ -0,0 +1,14 @@
# Agents Service (FastAPI)
```bash
python -m venv .venv
# Windows: .venv\Scripts\activate
pip install -r requirements.txt
cp .env.example .env
python -m uvicorn src.main:app --reload --port 8001
```
- Swagger: http://localhost:8001/docs
Docker: build from root with `docker compose up --build`

View File

@@ -0,0 +1,7 @@
fastapi>=0.110,<1
uvicorn[standard]>=0.29,<1
pydantic>=2.10,<3
python-dotenv>=1,<2
langchain>=0.3.27,<0.4
langchain-community>=0.3.0,<0.4
gigachat>=0.1.0

View File

@@ -0,0 +1,193 @@
from math import inf
import re
def safe_div(a, b):
return (a / b) if b else None
KPI_LABELS = {
"ctr": "CTR",
"cpc": "CPC",
"cr": "CR",
"cpl": "CPL",
"cpa": "CPA",
}
def parse_policy_text(text: str) -> dict:
"""Very small heuristic parser for free-form requests.
Supports phrases like:
'Оптимизируй по CPA, хороший до 800, средний до 1200, клики минимум 20'
"""
if not text:
return {}
t = text.lower()
kpi = None
for k in ["cpa","cpl","cpc","ctr","cr"]:
if k in t:
kpi = k
break
direction = "max" if kpi in ("ctr","cr") else "min"
nums = list(map(float, re.findall(r"(\d+[\.,]?\d*)", t)))
good = nums[0] if len(nums) >= 1 else None
ok = nums[1] if len(nums) >= 2 else None
min_clicks = 0
m = re.search(r"клик[аио]в?\s*(?:минимум|min)\s*(\d+)", t)
if m: min_clicks = int(m.group(1))
min_impr = 0
m = re.search(r"показ[а-я]*\s*(?:минимум|min)\s*(\d+)", t)
if m: min_impr = int(m.group(1))
return {
"mode": "text",
"primary_kpi": kpi or "cpl",
"direction": direction,
"good_threshold": good,
"ok_threshold": ok,
"min_clicks": min_clicks,
"min_impressions": min_impr,
"query_text": text,
}
class AnalyzeAgent:
def analyze(self, req: dict) -> dict:
rows = req["rows"]
objective = (req.get("objective") or "leads").lower()
policy = req.get("policy") or {}
if policy.get("mode") == "text" and policy.get("query_text"):
policy = {**parse_policy_text(policy.get("query_text")), **policy}
primary_kpi = (policy.get("primary_kpi") or "").lower() or ("cpl" if objective=="leads" else "cpa" if objective=="conversions" else "cpc")
direction = (policy.get("direction") or ("max" if primary_kpi in ("ctr","cr") else "min")).lower()
good_th = policy.get("good_threshold", None)
ok_th = policy.get("ok_threshold", None)
min_impr = int(policy.get("min_impressions") or 0)
min_clicks = int(policy.get("min_clicks") or 0)
# score per (variant, segment) first
by_variant = {}
for r in rows:
vid = r.get("variant_id")
fmt = r.get("format")
seg_id = r.get("segment_id")
seg_name = r.get("segment_name") or (f"Сегмент {seg_id}" if seg_id is not None else None)
impressions = int(r.get("impressions") or 0)
clicks = int(r.get("clicks") or 0)
conversions = int(r.get("conversions") or 0)
leads = int(r.get("leads") or 0)
spend = float(r.get("spend") or 0.0)
ctr = safe_div(clicks, impressions)
cr = safe_div(conversions, clicks)
cpc = safe_div(spend, clicks)
cpa = safe_div(spend, conversions) if conversions else None
cpl = safe_div(spend, leads) if leads else None
m = {
"impressions": impressions, "clicks": clicks, "conversions": conversions, "leads": leads, "spend": spend,
"ctr": ctr, "cr": cr, "cpc": cpc, "cpa": cpa, "cpl": cpl
}
entry = by_variant.setdefault(vid, {"variant_id": vid, "format": fmt, "totals": {"impressions":0,"clicks":0,"conversions":0,"leads":0,"spend":0.0}, "segments": []})
entry["format"] = entry["format"] or fmt
entry["totals"]["impressions"] += impressions
entry["totals"]["clicks"] += clicks
entry["totals"]["conversions"] += conversions
entry["totals"]["leads"] += leads
entry["totals"]["spend"] += spend
entry["segments"].append({"segment_id": seg_id, "segment_name": seg_name, "metrics": m})
# compute aggregated metrics per variant
scored=[]
for vid, data in by_variant.items():
t = data["totals"]
ctr = safe_div(t["clicks"], t["impressions"])
cr = safe_div(t["conversions"], t["clicks"])
cpc = safe_div(t["spend"], t["clicks"])
cpa = safe_div(t["spend"], t["conversions"]) if t["conversions"] else None
cpl = safe_div(t["spend"], t["leads"]) if t["leads"] else None
agg = {**t, "ctr": ctr, "cr": cr, "cpc": cpc, "cpa": cpa, "cpl": cpl}
# choose KPI value
kpi_value = agg.get(primary_kpi)
# low data rule
low_data = (t["impressions"] < min_impr) or (t["clicks"] < min_clicks)
status = "low_data" if low_data else "unknown"
if not low_data and kpi_value is not None:
if direction == "min":
if good_th is not None and kpi_value <= good_th:
status = "good"
elif ok_th is not None and kpi_value <= ok_th:
status = "ok"
else:
status = "bad"
else:
if good_th is not None and kpi_value >= good_th:
status = "good"
elif ok_th is not None and kpi_value >= ok_th:
status = "ok"
else:
status = "bad"
# sort key
if kpi_value is None:
sort_k = inf if direction=="min" else -inf
else:
sort_k = kpi_value if direction=="min" else -kpi_value
scored.append({
"variant_id": vid,
"format": data["format"],
"status": status,
"kpi": primary_kpi,
"kpi_value": kpi_value,
"metrics": agg,
"segments": data["segments"],
"_sort": (0 if status!="low_data" else 1, sort_k, -(ctr or 0)),
})
scored.sort(key=lambda x: x["_sort"])
ranking=[]
for i, s in enumerate(scored, start=1):
ranking.append({
"rank": i,
"variant_id": s["variant_id"],
"format": s["format"],
"status": s["status"],
"kpi": s["kpi"],
"kpi_value": s["kpi_value"],
"metrics": s["metrics"],
"segments": s["segments"],
})
# recommendations (deterministic, but "agent-like")
rec=[]
if ranking:
good = [x for x in ranking if x["status"]=="good"]
bad = [x for x in ranking if x["status"]=="bad"]
ok = [x for x in ranking if x["status"]=="ok"]
ld = [x for x in ranking if x["status"]=="low_data"]
if good:
rec.append(f"Масштабируйте: лучший текст #{good[0]['variant_id']} (статус: хороший по {KPI_LABELS.get(primary_kpi, primary_kpi)}).")
if ok:
rec.append("Средние варианты можно улучшить: проверьте УТП/CTA и уточните аудиторию сегмента.")
if bad:
rec.append("Плохие варианты лучше переписать: попробуйте другой заголовок/обещание, проверьте ограничения и соответствие сегменту.")
if ld:
rec.append("Для части вариантов мало данных. Наберите больше показов/кликов, затем повторите анализ.")
return {
"policy_used": {
"mode": policy.get("mode") or "thresholds",
"primary_kpi": primary_kpi,
"direction": direction,
"good_threshold": good_th,
"ok_threshold": ok_th,
"min_impressions": min_impr,
"min_clicks": min_clicks,
"query_text": policy.get("query_text"),
},
"ranking": ranking,
"recommendations": rec,
}

View File

@@ -0,0 +1,29 @@
from src.chains.text_generation import TextGenerationChain
class TextGenAgent:
def __init__(self):
self.chain = TextGenerationChain()
async def generate(self, req: dict) -> dict:
formats = req["formats"]
n = int(req.get("variants_per_format", 3))
brief = {
"product": req["product"],
"audience": req["audience"],
"usp": req.get("usp"),
"benefits": req.get("benefits") or [],
"constraints": req.get("constraints"),
"tone": req.get("tone"),
}
variants = []
for fmt in formats:
items = await self.chain.generate_for_format(brief, fmt, n)
variants.append({
"format": fmt,
"items": [{
"payload": it,
"placement_tips": "Тестируйте 2-3 варианта в одном канале, фиксируйте клики/лиды вручную.",
"expected_effect": "Гипотеза: улучшение CTR/CR за счёт другого УТП/CTA."
} for it in items],
})
return {"formats": formats, "variants": variants}

View File

@@ -0,0 +1,30 @@
from fastapi import APIRouter, HTTPException
from src.models.schemas import TextGenRequest, AnalyzeRequest
from src.agents.textgen_agent import TextGenAgent
from src.agents.analyze_agent import AnalyzeAgent
router = APIRouter()
_textgen = None
_analyze = AnalyzeAgent()
def get_textgen():
global _textgen
if _textgen is None:
_textgen = TextGenAgent()
return _textgen
@router.post("/texts/generate")
async def texts_generate(req: TextGenRequest):
try:
return await get_textgen().generate(req.model_dump())
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/tests/analyze")
async def tests_analyze(req: AnalyzeRequest):
try:
return _analyze.analyze(req.model_dump())
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,50 @@
import json
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from src.llm.gigachat_client import GigaChatClient
FORMAT_SPECS = {
"social_post": {
"instruction": "Пост для соцсетей: hook (1 строка), body (3-6 строк), cta (1 строка).",
"schema": {"hook":"...", "body":"...", "cta":"..."}
},
"search_ad": {
"instruction": "Поисковое объявление: 5 заголовков (<= 56 символов) и 5 описаний (<= 81 символ).",
"schema": {"headlines":["..."], "descriptions":["..."], "cta":"..."}
},
"email": {
"instruction": "Email: subject, preheader, body (коротко), cta.",
"schema": {"subject":"...", "preheader":"...", "body":"...", "cta":"..."}
}
}
def _escape_braces(s: str) -> str:
return s.replace("{","{{").replace("}","}}")
class TextGenerationChain:
def __init__(self):
self.client = GigaChatClient(temperature=0.7, max_tokens=2200)
async def generate_for_format(self, brief: dict, fmt: str, n: int) -> list[dict]:
spec = FORMAT_SPECS.get(fmt, {"instruction":"Рекламный текст + CTA.", "schema":{"text":"...", "cta":"..."}})
schema = _escape_braces(json.dumps(spec["schema"], ensure_ascii=False, indent=2))
prompt = PromptTemplate(
input_variables=["brief_json","n"],
template=(
"Ты маркетолог и копирайтер. Пиши по-русски.\n"
"Соблюдай ограничения, не обещай гарантии.\n"
f"Формат: {fmt}. {spec['instruction']}\n\n"
"Бриф (JSON): {brief_json}\n"
"Сгенерируй {n} вариантов.\n"
"Верни ТОЛЬКО JSON массив объектов, без markdown.\n"
"Пример схемы одного объекта:\n"
f"{schema}\n"
),
)
chain = LLMChain(llm=self.client.llm, prompt=prompt)
raw = await chain.apredict(brief_json=json.dumps(brief, ensure_ascii=False), n=str(n))
start, end = raw.find("["), raw.rfind("]")
if start == -1 or end == -1:
raise ValueError("LLM did not return JSON array")
return json.loads(raw[start:end+1])

View File

@@ -0,0 +1,22 @@
import os
import base64
from langchain_community.llms import GigaChat
class GigaChatClient:
def __init__(self, temperature: float = 0.6, max_tokens: int = 2000):
client_id = (os.getenv("GIGACHAT_CLIENT_ID") or "").strip().strip('"').strip("'")
client_secret = (os.getenv("GIGACHAT_CLIENT_SECRET") or "").strip().strip('"').strip("'")
if not client_id or not client_secret:
raise ValueError("Set GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET in .env")
credentials = base64.b64encode(f"{client_id}:{client_secret}".encode("utf-8")).decode("utf-8")
scope = (os.getenv("GIGACHAT_SCOPE") or "GIGACHAT_API_PERS").strip()
model = (os.getenv("GIGACHAT_MODEL") or "GigaChat").strip()
verify_ssl = (os.getenv("GIGACHAT_VERIFY_SSL_CERTS","false").lower() in ("1","true","yes","y","on"))
self.llm = GigaChat(
credentials=credentials,
scope=scope,
model=model,
temperature=temperature,
max_tokens=max_tokens,
verify_ssl_certs=verify_ssl,
)

View File

@@ -0,0 +1,15 @@
import os
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
load_dotenv()
app = FastAPI(title="AdsAssistant Agents Service", version="0.1.0")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
from src.api.routes import router # noqa
app.include_router(router, prefix="/api/v1")
@app.get("/health")
async def health():
return {"ok": True}

View File

@@ -0,0 +1,18 @@
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
class TextGenRequest(BaseModel):
product: str
audience: str
usp: Optional[str] = None
benefits: List[str] = []
constraints: Optional[str] = None
tone: Optional[str] = None
formats: List[str] = Field(..., min_length=1)
variants_per_format: int = Field(3, ge=1, le=10)
class AnalyzeRequest(BaseModel):
rows: List[Dict[str, Any]] # {variant_id, format, impressions, clicks, conversions, leads, spend}
objective: str = "leads"
policy: Optional[Dict[str, Any]] = None
notes: Optional[str] = None