Market Signal Realtime Collection — 实时市场信号采集:事件驱动感知与趋势冷启动检测
Skill-Market-Signal-Realtime-Collection · 22-数据采集工程
causalexperimentforecastingoptimizationdata_collectionpricing广告与投放供应链与补货数据采集与治理定价与利润WF-A 智能补货WF-B 广告优化WF-D 选品扫描WF-F 动态定价
实现难度⭐⭐☆☆☆
业务视角
适用角色数据工程师 / 技术负责人 · 运营负责人 · 选品负责人
适用平台Amazon SP API + Keepa · TikTok Shop API · 跨境多平台数据湖
什么情况下用想监控竞品价格/评论/排名但没有稳定采集能力,手动太慢;多平台数据分散整合成本极高;数据管道不稳定经常断
成功是什么样的竞品价格/评论数据每日自动更新,多平台数据统一入仓,数据管道稳定性 >99%,取数时间从小时降到分钟
业务痛点
1. 解决的问题
母婴爆品的"起飞时刻"通常在搜索量还很低的阶段。
2. 核心算法逻辑
传统爬虫调度的核心盲区:不知道"现在"值不值得爬。
3. 业务应用场景
业务背景:黑五/大促期间,竞品价格调整频率从"每天 1-2 次"激增到"每小时 3-5 次"。固定调度爬虫每小时一次,会错过 60-70% 的价格变动事件,导致跟价决策滞后。
数据要求: - 促销日历(CSV 或内部系统接口) - 历史价格时序(至少 3 个月)
预期产出: - 价格变动捕获率从 ~35%(固定调度)提升至 ~85%(事件感知调度) - ROI:大促期间提前 2-4 小时发现竞品降价 → 早 2-4 小时跟价 → 预估多转化 3-8%
4. 输入数据要求
请查看原始代码模板获取输入规格。
5. 输出结果
- `signal_strength >= 0.8` → 高频模式(5分钟/次),大促当天、黑五
- `signal_strength 0.5-0.8` → 中频模式(15-30分钟/次),大促前 2-3 天
- `signal_strength < 0.3` → 低频模式(120分钟/次),平静期
- `trend_score >= 0.75` → 立即启动专项爬取 + 通知选品团队
- `trend_score 0.5-0.75` → 加入观察列表,持续监控 3-7 天
6. 业务价值 / ROI
未自动抽取;请查看原始 Skill 卡片。
7. 代码模板
代码块数量:7 · 路径:未检测到
"""
Market Signal Realtime Collection
整合 EventCast(事件感知信号强度) + RTTP(趋势冷启动检测)
输出:驱动 Skill-Adaptive-Crawl-Scheduling 的动态信号
"""
import math
import re
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta
from typing import Dict, List, Optional, Tuple
# ── 数据结构 ────────────────────────────────────────────────────────────────
@dataclass
class BusinessEvent:
"""业务事件(EventCast 的输入单元)"""
date: str # YYYY-MM-DD
event_type: str # "大促" / "节假日" / "flash_sale"
categories: List[str] # 受影响品类
intensity: str # "high" / "medium" / "low"
description: str = ""
@property
def intensity_score(self) -> float:
return {"high": 1.0, "medium": 0.6, "low": 0.3}.get(self.intensity, 0.3)
@dataclass
class SocialPost:
"""社交媒体帖子(RTTP 的输入单元)"""
post_id: str
content: str
platform: str # "tiktok" / "reddit" / "xiaohongshu"
author_authority: float = 0.5 # 0-1,作者历史母婴内容可信度
comment_count: int = 0
like_count: int = 0
share_count: int = 0
timestamp: str = ""
@property
def engagement_strength(self) -> float:
"""深度参与 > 浅层参与(评论/分享权重高于点赞)"""
score = (self.comment_count * 3.0 + self.share_count * 2.0 + self.like_count * 0.5)
# 对数归一化,防止超级帖子垄断
return min(1.0, math.log1p(score) / math.log1p(10000))
@dataclass
class TrendSignal:
"""趋势信号输出"""
query: str
trend_score: float
source_post_id: str
detected_at: str
category_hints: List[str] = field(default_factory=list)
@property
def is_alert(self) -> bool:
8. 论文来源
- 2601.17567
- 2602.07695