Realtime Feature Collection — 流式特征采集与在线特征仓库:推荐系统实时个性化的数据基础设施
Skill-Realtime-Feature-Collection · 22-数据采集工程
causalexperimentoptimizationrecommendationdata_collectionpricing推荐与搜索数据采集与治理定价与利润WF-D 选品扫描WF-F 动态定价WF-G Listing内容优化
年化 ROI100 万元
实现难度⭐⭐⭐☆☆
业务视角
适用角色数据工程师 / 技术负责人 · 运营负责人 · 选品负责人
适用平台Amazon SP API + Keepa · TikTok Shop API · 跨境多平台数据湖
什么情况下用想监控竞品价格/评论/排名但没有稳定采集能力,手动太慢;多平台数据分散整合成本极高;数据管道不稳定经常断
成功是什么样的竞品价格/评论数据每日自动更新,多平台数据统一入仓,数据管道稳定性 >99%,取数时间从小时降到分钟
业务痛点
1. 解决的问题
后果:用户浏览了某款婴儿车 → 系统还在推荐她上周看过的奶粉 → CTR 下降,转化率损失。
2. 核心算法逻辑
核心问题:特征时效性悖论
3. 业务应用场景
业务背景:用户在母婴跨境平台刚完成奶瓶购买,当前推荐系统继续推送同品类奶瓶(特征延迟 24h)。如果能在 1 分钟内感知该购买行为,应立即切换推荐关联品类(奶嘴、奶瓶刷、消毒锅)。
| 特征名 | 类型 | 窗口 | 更新频率 | 存储 | |------|------|------|---------|------| | `user_last_purchase_category` | 字符串 | 最近 1 次 | 实时 | Redis STRING | | `user_recent_categories_30m` | 列表 | 最近 30 分钟 | 实时 | Redis LIST | | `user_purchase_count_1d` | 整数 | 今日累计 | 实时 | Redis INCR | | `user_brand_clicks_7d` | 字典 | 7 日
业务背景:某款婴儿睡袋在小红书突然爆红,但批处理特征仓库需要次日才能更新商品热度分。如果能实时采集多源行为信号(站内收藏、外部搜索词、竞品 listing 评分飙升),可以在 30 分钟内将该商品推荐给目标用户群。
4. 输入数据要求
请查看原始代码模板获取输入规格。
5. 输出结果
请查看原始代码模板获取输出规格。
6. 业务价值 / ROI
100 万元
7. 代码模板
代码块数量:10 · 路径:未检测到
"""
实时特征采集与在线特征仓库
整合:
- 流式特征计算(模拟 Kafka Consumer + Flink-style 窗口)
- 在线特征仓库(模拟 Redis,含 TTL 管理)
- 推理时特征注入(Inference-Time Feature Injection)
- 特征时效性衰减模型
论文来源:
2512.14734 (Tubi inference-time injection)
2501.08591 (OpenMLDB unified feature computation)
2409.00400 (bilibili batch query consistency)
"""
import time
import math
import queue
import threading
import random
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
from collections import defaultdict, deque
# ──────────────────────────────────────────────────────────
# 数据结构
# ──────────────────────────────────────────────────────────
@dataclass
class UserEvent:
"""用户行为事件(Kafka 消息格式)"""
user_id: str
event_type: str # "view" | "add_to_cart" | "purchase" | "share"
item_id: str
category: str
price: float
brand: str
timestamp: float = field(default_factory=time.time)
@dataclass
class FeatureVector:
"""特征向量,含时效性元数据"""
user_id: str
features: Dict[str, Any]
created_at: float = field(default_factory=time.time)
source: str = "batch" # "batch" | "realtime" | "merged"
def age_seconds(self) -> float:
return time.time() - self.created_at
# ──────────────────────────────────────────────────────────
# Mock Redis(在线特征仓库)
# ──────────────────────────────────────────────────────────
class MockRedis:
"""
模拟 Redis 在线特征仓库
支持 STRING / LIST / HASH / TTL
8. 论文来源
- 2409.00400
- 2501.08591
- 2512.14734