From a28b520b8ac4f6846ab7c2b3440cbd305ae59da5 Mon Sep 17 00:00:00 2001 From: Ilnar Date: Thu, 5 Mar 2026 10:30:13 +0300 Subject: [PATCH] Analizer Agent --- agents_service/src/agents/analyze_agent.py | 449 +++++++++++++-------- 1 file changed, 285 insertions(+), 164 deletions(-) diff --git a/agents_service/src/agents/analyze_agent.py b/agents_service/src/agents/analyze_agent.py index affdc4b..2681e14 100644 --- a/agents_service/src/agents/analyze_agent.py +++ b/agents_service/src/agents/analyze_agent.py @@ -1,193 +1,314 @@ -from math import inf +from __future__ import annotations + +import json import re +from math import inf +from typing import Any, Dict, List, Optional -def safe_div(a, b): - return (a / b) if b else None +from src.llm.gigachat_client import GigaChatClient -KPI_LABELS = { - "ctr": "CTR", - "cpc": "CPC", - "cr": "CR", - "cpl": "CPL", - "cpa": "CPA", -} -def parse_policy_text(text: str) -> dict: - """Very small heuristic parser for free-form requests. +def _extract_json(text: str) -> dict: + """Extract first JSON object from model output.""" + m = re.search(r"\{[\s\S]*\}", text) + if not m: + raise ValueError("LLM did not return JSON") + return json.loads(m.group(0)) + + +KPI_LABELS = {"ctr": "CTR", "cpc": "CPC", "cr": "CR", "cpl": "CPL", "cpa": "CPA"} + + +def _kpi_default_for_objective(objective: str) -> str: + obj = (objective or "leads").lower() + if obj in ("leads", "lead"): + return "cpl" + if obj in ("conversions", "conversion", "purchase"): + return "cpa" + if obj in ("clicks", "traffic"): + return "cpc" + return "cpl" + + +def _direction_for_kpi(kpi: str) -> str: + k = (kpi or "").lower() + return "max" if k in ("ctr", "cr") else "min" - Supports phrases like: - 'Оптимизируй по CPA, хороший до 800, средний до 1200, клики минимум 20' - """ - if not text: - return {} - t = text.lower() - kpi = None - for k in ["cpa","cpl","cpc","ctr","cr"]: - if k in t: - kpi = k - break - direction = "max" if kpi in ("ctr","cr") else "min" - nums = list(map(float, re.findall(r"(\d+[\.,]?\d*)", t))) - good = nums[0] if len(nums) >= 1 else None - ok = nums[1] if len(nums) >= 2 else None - min_clicks = 0 - m = re.search(r"клик[аио]в?\s*(?:минимум|min)\s*(\d+)", t) - if m: min_clicks = int(m.group(1)) - min_impr = 0 - m = re.search(r"показ[а-я]*\s*(?:минимум|min)\s*(\d+)", t) - if m: min_impr = int(m.group(1)) - return { - "mode": "text", - "primary_kpi": kpi or "cpl", - "direction": direction, - "good_threshold": good, - "ok_threshold": ok, - "min_clicks": min_clicks, - "min_impressions": min_impr, - "query_text": text, - } class AnalyzeAgent: - def analyze(self, req: dict) -> dict: - rows = req["rows"] - objective = (req.get("objective") or "leads").lower() - policy = req.get("policy") or {} - if policy.get("mode") == "text" and policy.get("query_text"): - policy = {**parse_policy_text(policy.get("query_text")), **policy} + """ + Agent #2 (LLM-first): - primary_kpi = (policy.get("primary_kpi") or "").lower() or ("cpl" if objective=="leads" else "cpa" if objective=="conversions" else "cpc") - direction = (policy.get("direction") or ("max" if primary_kpi in ("ctr","cr") else "min")).lower() - good_th = policy.get("good_threshold", None) - ok_th = policy.get("ok_threshold", None) - min_impr = int(policy.get("min_impressions") or 0) - min_clicks = int(policy.get("min_clicks") or 0) + Input: + - objective (goal) + - rows: list of segment-result rows per assignment (impressions/clicks/conversions/leads/spend) - # score per (variant, segment) first - by_variant = {} + Output: + - ranking of variants + - per-segment metrics table (CTR/CPC/CR/CPL/CPA) for each variant + - recommendations + """ + + def __init__(self): + self.llm: Optional[Any] = None + try: + self.llm = GigaChatClient(temperature=0.2, max_tokens=1600).llm + except Exception: + self.llm = None + + def _fallback_compute(self, objective: str, rows: List[dict]) -> dict: + """Deterministic fallback if LLM unavailable. Uses correct min/max logic.""" + def safe_div(a: float, b: float) -> Optional[float]: + return (a / b) if b else None + + primary = _kpi_default_for_objective(objective) + direction = _direction_for_kpi(primary) + + by_variant: Dict[Any, dict] = {} for r in rows: vid = r.get("variant_id") fmt = r.get("format") seg_id = r.get("segment_id") - seg_name = r.get("segment_name") or (f"Сегмент {seg_id}" if seg_id is not None else None) + seg_name = r.get("segment_name") or (f"Сегмент {seg_id}" if seg_id is not None else "Сегмент") - impressions = int(r.get("impressions") or 0) - clicks = int(r.get("clicks") or 0) - conversions = int(r.get("conversions") or 0) + imp = int(r.get("impressions") or 0) + clk = int(r.get("clicks") or 0) + conv = int(r.get("conversions") or 0) leads = int(r.get("leads") or 0) spend = float(r.get("spend") or 0.0) - ctr = safe_div(clicks, impressions) - cr = safe_div(conversions, clicks) - cpc = safe_div(spend, clicks) - cpa = safe_div(spend, conversions) if conversions else None - cpl = safe_div(spend, leads) if leads else None - - m = { - "impressions": impressions, "clicks": clicks, "conversions": conversions, "leads": leads, "spend": spend, - "ctr": ctr, "cr": cr, "cpc": cpc, "cpa": cpa, "cpl": cpl + metrics = { + "ctr": safe_div(clk, imp), + "cpc": safe_div(spend, clk), + "cr": safe_div(conv, clk), + "cpl": safe_div(spend, leads) if leads else None, + "cpa": safe_div(spend, conv) if conv else None, } - entry = by_variant.setdefault(vid, {"variant_id": vid, "format": fmt, "totals": {"impressions":0,"clicks":0,"conversions":0,"leads":0,"spend":0.0}, "segments": []}) + entry = by_variant.setdefault( + vid, + {"variant_id": vid, "format": fmt, "segments": [], "totals": {"impressions": 0, "clicks": 0, "conversions": 0, "leads": 0, "spend": 0.0}}, + ) entry["format"] = entry["format"] or fmt - entry["totals"]["impressions"] += impressions - entry["totals"]["clicks"] += clicks - entry["totals"]["conversions"] += conversions + entry["totals"]["impressions"] += imp + entry["totals"]["clicks"] += clk + entry["totals"]["conversions"] += conv entry["totals"]["leads"] += leads entry["totals"]["spend"] += spend - entry["segments"].append({"segment_id": seg_id, "segment_name": seg_name, "metrics": m}) + entry["segments"].append({"segment_id": seg_id, "segment_name": seg_name, "metrics": metrics}) - # compute aggregated metrics per variant - scored=[] - for vid, data in by_variant.items(): - t = data["totals"] - ctr = safe_div(t["clicks"], t["impressions"]) - cr = safe_div(t["conversions"], t["clicks"]) - cpc = safe_div(t["spend"], t["clicks"]) - cpa = safe_div(t["spend"], t["conversions"]) if t["conversions"] else None - cpl = safe_div(t["spend"], t["leads"]) if t["leads"] else None + scored = [] + for vid, v in by_variant.items(): + t = v["totals"] + agg = { + "ctr": safe_div(t["clicks"], t["impressions"]), + "cpc": safe_div(t["spend"], t["clicks"]), + "cr": safe_div(t["conversions"], t["clicks"]), + "cpl": safe_div(t["spend"], t["leads"]) if t["leads"] else None, + "cpa": safe_div(t["spend"], t["conversions"]) if t["conversions"] else None, + } + val = agg.get(primary) + sort_val = (inf if direction == "min" else -inf) if val is None else (val if direction == "min" else -val) + scored.append((sort_val, vid, v["format"], agg, v["segments"])) - agg = {**t, "ctr": ctr, "cr": cr, "cpc": cpc, "cpa": cpa, "cpl": cpl} + scored.sort(key=lambda x: x[0]) - # choose KPI value - kpi_value = agg.get(primary_kpi) - # low data rule - low_data = (t["impressions"] < min_impr) or (t["clicks"] < min_clicks) - status = "low_data" if low_data else "unknown" - if not low_data and kpi_value is not None: - if direction == "min": - if good_th is not None and kpi_value <= good_th: - status = "good" - elif ok_th is not None and kpi_value <= ok_th: - status = "ok" - else: - status = "bad" - else: - if good_th is not None and kpi_value >= good_th: - status = "good" - elif ok_th is not None and kpi_value >= ok_th: - status = "ok" - else: - status = "bad" - - # sort key - if kpi_value is None: - sort_k = inf if direction=="min" else -inf - else: - sort_k = kpi_value if direction=="min" else -kpi_value - scored.append({ - "variant_id": vid, - "format": data["format"], - "status": status, - "kpi": primary_kpi, - "kpi_value": kpi_value, - "metrics": agg, - "segments": data["segments"], - "_sort": (0 if status!="low_data" else 1, sort_k, -(ctr or 0)), - }) - - scored.sort(key=lambda x: x["_sort"]) - - ranking=[] - for i, s in enumerate(scored, start=1): - ranking.append({ - "rank": i, - "variant_id": s["variant_id"], - "format": s["format"], - "status": s["status"], - "kpi": s["kpi"], - "kpi_value": s["kpi_value"], - "metrics": s["metrics"], - "segments": s["segments"], - }) - - # recommendations (deterministic, but "agent-like") - rec=[] - if ranking: - good = [x for x in ranking if x["status"]=="good"] - bad = [x for x in ranking if x["status"]=="bad"] - ok = [x for x in ranking if x["status"]=="ok"] - ld = [x for x in ranking if x["status"]=="low_data"] - - if good: - rec.append(f"Масштабируйте: лучший текст #{good[0]['variant_id']} (статус: хороший по {KPI_LABELS.get(primary_kpi, primary_kpi)}).") - if ok: - rec.append("Средние варианты можно улучшить: проверьте УТП/CTA и уточните аудиторию сегмента.") - if bad: - rec.append("Плохие варианты лучше переписать: попробуйте другой заголовок/обещание, проверьте ограничения и соответствие сегменту.") - if ld: - rec.append("Для части вариантов мало данных. Наберите больше показов/кликов, затем повторите анализ.") + ranking = [] + for i, (_, vid, fmt, agg, segs) in enumerate(scored, start=1): + ranking.append( + { + "rank": i, + "variant_id": vid, + "format": fmt, + "status": "unknown", + "reason": None, + "kpi": primary, + "kpi_value": agg.get(primary), + "metrics": agg, + "segments": segs, + } + ) return { - "policy_used": { - "mode": policy.get("mode") or "thresholds", - "primary_kpi": primary_kpi, - "direction": direction, - "good_threshold": good_th, - "ok_threshold": ok_th, - "min_impressions": min_impr, - "min_clicks": min_clicks, - "query_text": policy.get("query_text"), - }, + "policy_used": {"primary_kpi": primary, "direction": direction}, "ranking": ranking, - "recommendations": rec, + "recommendations": ["LLM недоступен — показан детерминированный рейтинг по цели теста."], } + + def _llm_analyze(self, objective: str, rows: List[dict], policy: dict) -> dict: + """ + Ask GigaChat to: + - compute metrics + - choose KPI (or respect policy.primary_kpi) + - rank and set statuses + - return per-segment metrics table per variant + """ + primary_hint = (policy.get("primary_kpi") or "").lower().strip() or _kpi_default_for_objective(objective) + direction_hint = (policy.get("direction") or "").lower().strip() or _direction_for_kpi(primary_hint) + + system = ( + "Ты маркетинговый аналитик A/B тестов. " + "Тебе дают сырые данные: показы, клики, конверсии, лиды, расход по сегментам и вариантам. " + "Твоя задача — корректно вычислить метрики CTR/CPC/CR/CPL/CPA, выбрать основной KPI, " + "ранжировать варианты и выставить статусы good/ok/bad/low_data. " + "ВАЖНО: соблюдай направление оптимизации: для CPA/CPL/CPC меньше=лучше; для CTR/CR больше=лучше. " + "Отвечай СТРОГО валидным JSON без лишнего текста." + ) + + user = f"""Цель теста: {objective} + +Подсказка по KPI (если не противоречит здравому смыслу): +primary_kpi_hint: {primary_hint} +direction_hint: {direction_hint} + +Политика (может быть частично задана, пороги могут отсутствовать): +{json.dumps(policy, ensure_ascii=False)} + +Сырые данные rows: +- variant_id (int) +- format (str) +- segment_id (int) +- segment_name (str) +- impressions (int) +- clicks (int) +- conversions (int) +- leads (int) +- spend (float) + +rows: +{json.dumps(rows, ensure_ascii=False)} + +Нужно вернуть JSON строго формата: +{{ + "primary_kpi": "cpl|cpa|cpc|ctr|cr", + "direction": "min|max", + "ranking": [ + {{ + "rank": 1, + "variant_id": 1, + "status": "good|ok|bad|low_data", + "reason": "коротко почему (1 строка)", + "kpi_value": 123.45 + }} + ], + "variants": [ + {{ + "variant_id": 1, + "format": "search_ad|social_post|email", + "segments": [ + {{ + "segment_name": "Группа A", + "metrics": {{ + "ctr": 0.012, + "cpc": 40.0, + "cr": 0.0333, + "cpl": 480.0, + "cpa": 1200.0 + }} + }} + ], + "totals": {{ + "ctr": 0.012, + "cpc": 40.0, + "cr": 0.0333, + "cpl": 480.0, + "cpa": 1200.0 + }} + }} + ], + "recommendations": ["...", "..."] +}} + +Требования: +- ranking должен содержать ВСЕ variant_id ровно один раз. +- rank должен быть 1..N. +- variants должен содержать ВСЕ variant_id. +- Метрики считай так: + CTR = clicks / impressions + CPC = spend / clicks + CR = conversions / clicks + CPL = spend / leads + CPA = spend / conversions +- Если деление невозможно (0 в знаменателе) — ставь null. +""" + + raw = self.llm.invoke(f"{system}\n\n{user}") + text = raw if isinstance(raw, str) else getattr(raw, "content", str(raw)) + return _extract_json(text) + + def analyze(self, req: dict) -> dict: + objective = (req.get("objective") or "leads").lower() + rows = req.get("rows") or [] + policy = req.get("policy") or {} + + if self.llm is None: + return self._fallback_compute(objective, rows) + + try: + out = self._llm_analyze(objective, rows, policy) + except Exception: + # if LLM fails — return safe deterministic + return self._fallback_compute(objective, rows) + + # --- SAFETY: enforce correct ordering by returned KPI/direction using kpi_value --- + primary = (out.get("primary_kpi") or _kpi_default_for_objective(objective)).lower() + direction = (out.get("direction") or _direction_for_kpi(primary)).lower() + + ranking = out.get("ranking") or [] + # If LLM forgot ranks or order — fix it deterministically by kpi_value + if isinstance(ranking, list) and ranking: + # validate uniqueness + seen = set() + cleaned = [] + for r in ranking: + vid = r.get("variant_id") + if vid in seen: + continue + seen.add(vid) + cleaned.append(r) + + # sort by kpi_value using direction, None goes last + def key_fn(r): + v = r.get("kpi_value") + if v is None: + return (1, inf) + return (0, float(v) if direction == "min" else -float(v)) + + cleaned.sort(key=key_fn) + for i, r in enumerate(cleaned, start=1): + r["rank"] = i + out["ranking"] = cleaned + + # Build response compatible with your current backend/UI expectations: + # UI сейчас ждёт "ranking": [{..., "segments": [...] }]. + # Мы склеим из out["variants"] + out["ranking"]. + variants_map = {v["variant_id"]: v for v in (out.get("variants") or []) if "variant_id" in v} + final_ranking = [] + for r in out.get("ranking", []): + vid = r.get("variant_id") + v = variants_map.get(vid, {}) + final_ranking.append( + { + "rank": r.get("rank"), + "variant_id": vid, + "format": v.get("format"), + "status": r.get("status"), + "reason": r.get("reason"), + "kpi": primary, + "kpi_value": r.get("kpi_value"), + "metrics": v.get("totals") or {}, + "segments": [ + { + "segment_name": s.get("segment_name"), + "metrics": s.get("metrics") or {}, + } + for s in (v.get("segments") or []) + ], + } + ) + + return { + "policy_used": {"primary_kpi": primary, "direction": direction}, + "ranking": final_ranking, + "recommendations": out.get("recommendations") or [], + } \ No newline at end of file