Analizer Agent

This commit is contained in:

View File

@@ -1,193 +1,314 @@
from math import inf
from __future__ import annotations
import json
import re
from math import inf
from typing import Any, Dict, List, Optional
def safe_div(a, b):
return (a / b) if b else None
from src.llm.gigachat_client import GigaChatClient
KPI_LABELS = {
"ctr": "CTR",
"cpc": "CPC",
"cr": "CR",
"cpl": "CPL",
"cpa": "CPA",
}
def parse_policy_text(text: str) -> dict:
"""Very small heuristic parser for free-form requests.
def _extract_json(text: str) -> dict:
"""Extract first JSON object from model output."""
m = re.search(r"\{[\s\S]*\}", text)
if not m:
raise ValueError("LLM did not return JSON")
return json.loads(m.group(0))
KPI_LABELS = {"ctr": "CTR", "cpc": "CPC", "cr": "CR", "cpl": "CPL", "cpa": "CPA"}
def _kpi_default_for_objective(objective: str) -> str:
obj = (objective or "leads").lower()
if obj in ("leads", "lead"):
return "cpl"
if obj in ("conversions", "conversion", "purchase"):
return "cpa"
if obj in ("clicks", "traffic"):
return "cpc"
return "cpl"
def _direction_for_kpi(kpi: str) -> str:
k = (kpi or "").lower()
return "max" if k in ("ctr", "cr") else "min"
Supports phrases like:
'Оптимизируй по CPA, хороший до 800, средний до 1200, клики минимум 20'
"""
if not text:
return {}
t = text.lower()
kpi = None
for k in ["cpa","cpl","cpc","ctr","cr"]:
if k in t:
kpi = k
break
direction = "max" if kpi in ("ctr","cr") else "min"
nums = list(map(float, re.findall(r"(\d+[\.,]?\d*)", t)))
good = nums[0] if len(nums) >= 1 else None
ok = nums[1] if len(nums) >= 2 else None
min_clicks = 0
m = re.search(r"клик[аио]в?\s*(?:минимум|min)\s*(\d+)", t)
if m: min_clicks = int(m.group(1))
min_impr = 0
m = re.search(r"показ[а-я]*\s*(?:минимум|min)\s*(\d+)", t)
if m: min_impr = int(m.group(1))
return {
"mode": "text",
"primary_kpi": kpi or "cpl",
"direction": direction,
"good_threshold": good,
"ok_threshold": ok,
"min_clicks": min_clicks,
"min_impressions": min_impr,
"query_text": text,
}
class AnalyzeAgent:
def analyze(self, req: dict) -> dict:
rows = req["rows"]
objective = (req.get("objective") or "leads").lower()
policy = req.get("policy") or {}
if policy.get("mode") == "text" and policy.get("query_text"):
policy = {**parse_policy_text(policy.get("query_text")), **policy}
"""
Agent #2 (LLM-first):
primary_kpi = (policy.get("primary_kpi") or "").lower() or ("cpl" if objective=="leads" else "cpa" if objective=="conversions" else "cpc")
direction = (policy.get("direction") or ("max" if primary_kpi in ("ctr","cr") else "min")).lower()
good_th = policy.get("good_threshold", None)
ok_th = policy.get("ok_threshold", None)
min_impr = int(policy.get("min_impressions") or 0)
min_clicks = int(policy.get("min_clicks") or 0)
Input:
- objective (goal)
- rows: list of segment-result rows per assignment (impressions/clicks/conversions/leads/spend)
# score per (variant, segment) first
by_variant = {}
Output:
- ranking of variants
- per-segment metrics table (CTR/CPC/CR/CPL/CPA) for each variant
- recommendations
"""
def __init__(self):
self.llm: Optional[Any] = None
try:
self.llm = GigaChatClient(temperature=0.2, max_tokens=1600).llm
except Exception:
self.llm = None
def _fallback_compute(self, objective: str, rows: List[dict]) -> dict:
"""Deterministic fallback if LLM unavailable. Uses correct min/max logic."""
def safe_div(a: float, b: float) -> Optional[float]:
return (a / b) if b else None
primary = _kpi_default_for_objective(objective)
direction = _direction_for_kpi(primary)
by_variant: Dict[Any, dict] = {}
for r in rows:
vid = r.get("variant_id")
fmt = r.get("format")
seg_id = r.get("segment_id")
seg_name = r.get("segment_name") or (f"Сегмент {seg_id}" if seg_id is not None else None)
seg_name = r.get("segment_name") or (f"Сегмент {seg_id}" if seg_id is not None else "Сегмент")
impressions = int(r.get("impressions") or 0)
clicks = int(r.get("clicks") or 0)
conversions = int(r.get("conversions") or 0)
imp = int(r.get("impressions") or 0)
clk = int(r.get("clicks") or 0)
conv = int(r.get("conversions") or 0)
leads = int(r.get("leads") or 0)
spend = float(r.get("spend") or 0.0)
ctr = safe_div(clicks, impressions)
cr = safe_div(conversions, clicks)
cpc = safe_div(spend, clicks)
cpa = safe_div(spend, conversions) if conversions else None
cpl = safe_div(spend, leads) if leads else None
m = {
"impressions": impressions, "clicks": clicks, "conversions": conversions, "leads": leads, "spend": spend,
"ctr": ctr, "cr": cr, "cpc": cpc, "cpa": cpa, "cpl": cpl
metrics = {
"ctr": safe_div(clk, imp),
"cpc": safe_div(spend, clk),
"cr": safe_div(conv, clk),
"cpl": safe_div(spend, leads) if leads else None,
"cpa": safe_div(spend, conv) if conv else None,
}
entry = by_variant.setdefault(vid, {"variant_id": vid, "format": fmt, "totals": {"impressions":0,"clicks":0,"conversions":0,"leads":0,"spend":0.0}, "segments": []})
entry = by_variant.setdefault(
vid,
{"variant_id": vid, "format": fmt, "segments": [], "totals": {"impressions": 0, "clicks": 0, "conversions": 0, "leads": 0, "spend": 0.0}},
)
entry["format"] = entry["format"] or fmt
entry["totals"]["impressions"] += impressions
entry["totals"]["clicks"] += clicks
entry["totals"]["conversions"] += conversions
entry["totals"]["impressions"] += imp
entry["totals"]["clicks"] += clk
entry["totals"]["conversions"] += conv
entry["totals"]["leads"] += leads
entry["totals"]["spend"] += spend
entry["segments"].append({"segment_id": seg_id, "segment_name": seg_name, "metrics": m})
entry["segments"].append({"segment_id": seg_id, "segment_name": seg_name, "metrics": metrics})
# compute aggregated metrics per variant
scored=[]
for vid, data in by_variant.items():
t = data["totals"]
ctr = safe_div(t["clicks"], t["impressions"])
cr = safe_div(t["conversions"], t["clicks"])
cpc = safe_div(t["spend"], t["clicks"])
cpa = safe_div(t["spend"], t["conversions"]) if t["conversions"] else None
cpl = safe_div(t["spend"], t["leads"]) if t["leads"] else None
scored = []
for vid, v in by_variant.items():
t = v["totals"]
agg = {
"ctr": safe_div(t["clicks"], t["impressions"]),
"cpc": safe_div(t["spend"], t["clicks"]),
"cr": safe_div(t["conversions"], t["clicks"]),
"cpl": safe_div(t["spend"], t["leads"]) if t["leads"] else None,
"cpa": safe_div(t["spend"], t["conversions"]) if t["conversions"] else None,
}
val = agg.get(primary)
sort_val = (inf if direction == "min" else -inf) if val is None else (val if direction == "min" else -val)
scored.append((sort_val, vid, v["format"], agg, v["segments"]))
agg = {**t, "ctr": ctr, "cr": cr, "cpc": cpc, "cpa": cpa, "cpl": cpl}
scored.sort(key=lambda x: x[0])
# choose KPI value
kpi_value = agg.get(primary_kpi)
# low data rule
low_data = (t["impressions"] < min_impr) or (t["clicks"] < min_clicks)
status = "low_data" if low_data else "unknown"
if not low_data and kpi_value is not None:
if direction == "min":
if good_th is not None and kpi_value <= good_th:
status = "good"
elif ok_th is not None and kpi_value <= ok_th:
status = "ok"
else:
status = "bad"
else:
if good_th is not None and kpi_value >= good_th:
status = "good"
elif ok_th is not None and kpi_value >= ok_th:
status = "ok"
else:
status = "bad"
# sort key
if kpi_value is None:
sort_k = inf if direction=="min" else -inf
else:
sort_k = kpi_value if direction=="min" else -kpi_value
scored.append({
"variant_id": vid,
"format": data["format"],
"status": status,
"kpi": primary_kpi,
"kpi_value": kpi_value,
"metrics": agg,
"segments": data["segments"],
"_sort": (0 if status!="low_data" else 1, sort_k, -(ctr or 0)),
})
scored.sort(key=lambda x: x["_sort"])
ranking=[]
for i, s in enumerate(scored, start=1):
ranking.append({
"rank": i,
"variant_id": s["variant_id"],
"format": s["format"],
"status": s["status"],
"kpi": s["kpi"],
"kpi_value": s["kpi_value"],
"metrics": s["metrics"],
"segments": s["segments"],
})
# recommendations (deterministic, but "agent-like")
rec=[]
if ranking:
good = [x for x in ranking if x["status"]=="good"]
bad = [x for x in ranking if x["status"]=="bad"]
ok = [x for x in ranking if x["status"]=="ok"]
ld = [x for x in ranking if x["status"]=="low_data"]
if good:
rec.append(f"Масштабируйте: лучший текст #{good[0]['variant_id']} (статус: хороший по {KPI_LABELS.get(primary_kpi, primary_kpi)}).")
if ok:
rec.append("Средние варианты можно улучшить: проверьте УТП/CTA и уточните аудиторию сегмента.")
if bad:
rec.append("Плохие варианты лучше переписать: попробуйте другой заголовок/обещание, проверьте ограничения и соответствие сегменту.")
if ld:
rec.append("Для части вариантов мало данных. Наберите больше показов/кликов, затем повторите анализ.")
ranking = []
for i, (_, vid, fmt, agg, segs) in enumerate(scored, start=1):
ranking.append(
{
"rank": i,
"variant_id": vid,
"format": fmt,
"status": "unknown",
"reason": None,
"kpi": primary,
"kpi_value": agg.get(primary),
"metrics": agg,
"segments": segs,
}
)
return {
"policy_used": {
"mode": policy.get("mode") or "thresholds",
"primary_kpi": primary_kpi,
"direction": direction,
"good_threshold": good_th,
"ok_threshold": ok_th,
"min_impressions": min_impr,
"min_clicks": min_clicks,
"query_text": policy.get("query_text"),
},
"policy_used": {"primary_kpi": primary, "direction": direction},
"ranking": ranking,
"recommendations": rec,
"recommendations": ["LLM недоступен — показан детерминированный рейтинг по цели теста."],
}
def _llm_analyze(self, objective: str, rows: List[dict], policy: dict) -> dict:
"""
Ask GigaChat to:
- compute metrics
- choose KPI (or respect policy.primary_kpi)
- rank and set statuses
- return per-segment metrics table per variant
"""
primary_hint = (policy.get("primary_kpi") or "").lower().strip() or _kpi_default_for_objective(objective)
direction_hint = (policy.get("direction") or "").lower().strip() or _direction_for_kpi(primary_hint)
system = (
"Ты маркетинговый аналитик A/B тестов. "
"Тебе дают сырые данные: показы, клики, конверсии, лиды, расход по сегментам и вариантам. "
"Твоя задача — корректно вычислить метрики CTR/CPC/CR/CPL/CPA, выбрать основной KPI, "
"ранжировать варианты и выставить статусы good/ok/bad/low_data. "
"ВАЖНО: соблюдай направление оптимизации: для CPA/CPL/CPC меньше=лучше; для CTR/CR больше=лучше. "
"Отвечай СТРОГО валидным JSON без лишнего текста."
)
user = f"""Цель теста: {objective}
Подсказка по KPI (если не противоречит здравому смыслу):
primary_kpi_hint: {primary_hint}
direction_hint: {direction_hint}
Политика (может быть частично задана, пороги могут отсутствовать):
{json.dumps(policy, ensure_ascii=False)}
Сырые данные rows:
- variant_id (int)
- format (str)
- segment_id (int)
- segment_name (str)
- impressions (int)
- clicks (int)
- conversions (int)
- leads (int)
- spend (float)
rows:
{json.dumps(rows, ensure_ascii=False)}
Нужно вернуть JSON строго формата:
{{
"primary_kpi": "cpl|cpa|cpc|ctr|cr",
"direction": "min|max",
"ranking": [
{{
"rank": 1,
"variant_id": 1,
"status": "good|ok|bad|low_data",
"reason": "коротко почему (1 строка)",
"kpi_value": 123.45
}}
],
"variants": [
{{
"variant_id": 1,
"format": "search_ad|social_post|email",
"segments": [
{{
"segment_name": "Группа A",
"metrics": {{
"ctr": 0.012,
"cpc": 40.0,
"cr": 0.0333,
"cpl": 480.0,
"cpa": 1200.0
}}
}}
],
"totals": {{
"ctr": 0.012,
"cpc": 40.0,
"cr": 0.0333,
"cpl": 480.0,
"cpa": 1200.0
}}
}}
],
"recommendations": ["...", "..."]
}}
Требования:
- ranking должен содержать ВСЕ variant_id ровно один раз.
- rank должен быть 1..N.
- variants должен содержать ВСЕ variant_id.
- Метрики считай так:
CTR = clicks / impressions
CPC = spend / clicks
CR = conversions / clicks
CPL = spend / leads
CPA = spend / conversions
- Если деление невозможно (0 в знаменателе) — ставь null.
"""
raw = self.llm.invoke(f"{system}\n\n{user}")
text = raw if isinstance(raw, str) else getattr(raw, "content", str(raw))
return _extract_json(text)
def analyze(self, req: dict) -> dict:
objective = (req.get("objective") or "leads").lower()
rows = req.get("rows") or []
policy = req.get("policy") or {}
if self.llm is None:
return self._fallback_compute(objective, rows)
try:
out = self._llm_analyze(objective, rows, policy)
except Exception:
# if LLM fails — return safe deterministic
return self._fallback_compute(objective, rows)
# --- SAFETY: enforce correct ordering by returned KPI/direction using kpi_value ---
primary = (out.get("primary_kpi") or _kpi_default_for_objective(objective)).lower()
direction = (out.get("direction") or _direction_for_kpi(primary)).lower()
ranking = out.get("ranking") or []
# If LLM forgot ranks or order — fix it deterministically by kpi_value
if isinstance(ranking, list) and ranking:
# validate uniqueness
seen = set()
cleaned = []
for r in ranking:
vid = r.get("variant_id")
if vid in seen:
continue
seen.add(vid)
cleaned.append(r)
# sort by kpi_value using direction, None goes last
def key_fn(r):
v = r.get("kpi_value")
if v is None:
return (1, inf)
return (0, float(v) if direction == "min" else -float(v))
cleaned.sort(key=key_fn)
for i, r in enumerate(cleaned, start=1):
r["rank"] = i
out["ranking"] = cleaned
# Build response compatible with your current backend/UI expectations:
# UI сейчас ждёт "ranking": [{..., "segments": [...] }].
# Мы склеим из out["variants"] + out["ranking"].
variants_map = {v["variant_id"]: v for v in (out.get("variants") or []) if "variant_id" in v}
final_ranking = []
for r in out.get("ranking", []):
vid = r.get("variant_id")
v = variants_map.get(vid, {})
final_ranking.append(
{
"rank": r.get("rank"),
"variant_id": vid,
"format": v.get("format"),
"status": r.get("status"),
"reason": r.get("reason"),
"kpi": primary,
"kpi_value": r.get("kpi_value"),
"metrics": v.get("totals") or {},
"segments": [
{
"segment_name": s.get("segment_name"),
"metrics": s.get("metrics") or {},
}
for s in (v.get("segments") or [])
],
}
)
return {
"policy_used": {"primary_kpi": primary, "direction": direction},
"ranking": final_ranking,
"recommendations": out.get("recommendations") or [],
}