paper2skills Playbook

Skill-Data-Drift-Detection

Skill-Data-Drift-Detection · 12-ML基础

causalexperimentforecastingrecommendationpricing广告与投放供应链与补货客服与VOC推荐与搜索定价与利润WF-A 智能补货WF-B 广告优化WF-C 客服分诊WF-E Review监控WF-F 动态定价WF-G Listing内容优化
实现难度⭐⭐☆☆☆
业务视角
适用角色数据分析师 / 数据工程师 · 运营负责人 · 产品经理
适用平台选品评分 · 差评预测 · 用户流失预警 · 广告出价预测
什么情况下用想用机器学习解决业务问题,但不知道该选什么模型;模型上线后效果越来越差不知道为什么
成功是什么样的选对算法工具减少 50% 试错时间,模型上线后可监控可解释,数据团队和业务团队建立共同语言
业务痛点
不知道该用什么模型模型准确率不稳定业务不相信模型结果模型黑盒说不清为什么这么预测

1. 解决的问题

核心思想:生产 ML 模型上线后,输入数据的分布会随时间偏移(用户行为变化、季节性、竞品冲击),导致模型悄然失效。数据漂移检测通过持续监控特征分布(统计漂移)和预测误差(性能漂移)两条并行轨道,在模型失效前触发告警和重训——区别于异常检测,漂移检测关注的是系统性、持续性的分布偏移,而非偶发性异常点。

2. 核心算法逻辑

核心思想:生产 ML 模型上线后,输入数据的分布会随时间偏移(用户行为变化、季节性、竞品冲击),导致模型悄然失效。数据漂移检测通过持续监控特征分布(统计漂移)和预测误差(性能漂移)两条并行轨道,在模型失效前触发告警和重训——区别于异常检测,漂移检测关注的是系统性、持续性的分布偏移,而非偶发性异常点。

3. 业务应用场景

场景 A:需求预测模型的 feature drift 监控

- 业务问题:baby sterilizer 需求预测模型(TFT)上线 6 个月后,MAPE 从 12% 悄悄涨到 28%,直到库存积压才被发现,损失约 $8,000 滞销成本。根本原因:618 大促后用户搜索行为永久性改变,模型未及时重训。 - 数据要求: - 每日预测特征快照(搜索量/历史销量/竞品数量/价格指数,共 N 列) - 模型训练时的基准特征分布(保存为 JSON 格式参考分布) - 大促日期日历(用于季节性窗口豁免) - 预期产出: - 每特征的 PSI 日报(标注超阈值特征) - 漂移类型判断(季节性豁免 / 真实漂移告警) - 重训建议(附 cost-aware ROI

场景 B:MAB 广告素材测试的 reward 分布漂移检测

4. 输入数据要求

请查看原始代码模板获取输入规格。

5. 输出结果

请查看原始代码模板获取输出规格。

6. 业务价值 / ROI

  • ROI 预估
  • DriftGuard 实测基准:检测到漂移后及时重训,ROI 达 417×(重训算力成本 vs. 预测误差导致的库存损失)
  • 母婴场景估算:需求预测 MAPE 从 28% 降回 12%,月库存偏差减少 ~16pp × 月销售额 $50,000 = 每月节省约 $8,000 滞销/缺货成本
  • MAB 场景:素材 CTR 漂移平均提前 4.2 天发现,按日广告预算 $200 估算,避免 4 天无效投放 ≈ $800/次
  • 实施难度:⭐⭐☆☆☆(2/5)— 纯 NumPy/pandas,无需 GPU,可接入现有监控脚本
  • 优先级评分:⭐⭐⭐⭐⭐(5/5)— 当前 8 个生产模型全部裸跑无监控,是最紧迫的横切面缺口

7. 代码模板

代码块数量:6 · 路径:paper2skills-code/ml_fundamentals/data_drift_detection

"""
Skill-Data-Drift-Detection
基于 arXiv:2601.08928 (DriftGuard, 2026) +
    OpenReview fUsgIfJYZs (Cry Wolf, ICLR 2026 Workshop)
母婴跨境电商 ML 模型生产数据漂移检测工具
"""

import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime, date
from enum import Enum


class DriftType(Enum):
    NO_DRIFT     = "无漂移"
    SEASONAL     = "季节性漂移(大促豁免)"
    WARNING      = "轻微漂移(监控)"
    ALERT        = "显著漂移(告警)"
    CRITICAL     = "严重漂移(触发重训)"


@dataclass
class FeatureDriftResult:
    feature_name: str
    psi: float
    drift_type: DriftType
    action: str


@dataclass
class ModelDriftReport:
    model_id: str
    check_date: str
    batch_size: int
    feature_results: list[FeatureDriftResult]
    overall_drift_type: DriftType
    drifted_features: list[str]
    recommendation: str
    retrain_roi_estimate: Optional[str] = None
    adwin_alerts: list[str] = field(default_factory=list)


# ── PSI 计算(批量统计检验)────────────────────────────────
def compute_psi(
    reference: np.ndarray,
    current: np.ndarray,
    n_bins: int = 10,
    min_count: float = 1e-6,
) -> float:
    """
    PSI = Σ (P_actual - P_expected) × ln(P_actual / P_expected)
    基于对称 KL 散度形式,bins 自适应参考分布分位数。
    """
    ref_clean = reference[~np.isnan(reference)]
    cur_clean = current[~np.isnan(current)]
    if len(ref_clean) == 0 or len(cur_clean) == 0:
        return 0.0

8. 论文来源

  • 2601.08928