paper2skills Playbook

Data Collection Agent Pipeline — LLM Agent 自动化多源数据采集 Pipeline

Skill-Data-Collection-Agent-Pipeline · 09-DataAgent-LLM

causalexperimentmulti_agentdata_collectionpricing客服与VOC推荐与搜索数据采集与治理MAS与智能体工程定价与利润WF-C 客服分诊WF-D 选品扫描WF-E Review监控WF-F 动态定价WF-G Listing内容优化
实现难度⭐⭐⭐☆☆
业务视角
适用角色数据分析师 / 运营负责人 · CEO · 供应链负责人
适用平台Amazon SP API · Shopify · TikTok Ads API · 多平台数据整合
什么情况下用数据需求太多,数据团队排期 2 周;非技术人员(采购/客服/运营)有数据问题但不会 SQL;重复报表占用大量时间
成功是什么样的业务方用自然语言自助查数据,常规报表自动化,数据驱动决策响应速度从「天」变「分钟」
业务痛点
数据需求排期太长不会 SQL 只能等数据团队老板临时要数据没法马上出分析师时间都花在取数上

1. 解决的问题

传统数据采集依赖人工编写爬虫脚本,每个数据源需要单独维护。

2. 核心算法逻辑

传统数据采集依赖人工编写爬虫脚本,每个数据源需要单独维护。当数据源频繁变更(电商平台 DOM 更新、API 版本迭代、反爬策略升级)时,维护成本极高。LLM Agent 数据采集 Pipeline 解决的核心问题是:

3. 业务应用场景

业务背景:选品团队需要每周采集 Amazon US/UK/DE 三站点婴儿安全座椅品类 Top 200 SKU 的完整竞品数据(价格、评分、评论数、主图、A+页面关键词),以往需要 3 名数据专员耗时 2 天完成。

量化对比: - 人工耗时:3 人 × 2 天 = 48 人时 - Agent 耗时:2 小时 14 分(含重试),运营监督 30 分钟 - 效率提升:约 20 倍,数据新鲜度从"周更"提升至"日更"

业务背景:产品团队需要汇聚 Amazon + Walmart + BabyList + Reddit 四平台的母乳泵用户评论,构建 VOC(Voice of Customer)数据库,每月手工整理耗时约 40 小时。

4. 输入数据要求

请查看原始代码模板获取输入规格。

5. 输出结果

请查看原始代码模板获取输出规格。

6. 业务价值 / ROI

未自动抽取;请查看原始 Skill 卡片。

7. 代码模板

代码块数量:3 · 路径:未检测到

"""
LLM Agent 自动化数据采集 Pipeline
整合 ReAct 规划执行 + 多源融合 + 实体对齐去重
arXiv 参考: 2403.08299 (WebVoyager: LLM Web Agent),
           2404.11584 (AgentBench),
           2502.09986 (DataHarvester: LLM-Driven Multi-Source Collection)
"""

import time
import json
import hashlib
import numpy as np
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable
from enum import Enum


# ── 数据结构 ─────────────────────────────────────────────────────────────

class ActionType(Enum):
    FETCH_URL = "fetch_url"
    PARSE_HTML = "parse_html"
    CALL_API = "call_api"
    EXTRACT_STRUCTURED = "extract_structured"
    DEDUP_AND_MERGE = "dedup_and_merge"
    DONE = "done"


@dataclass
class Action:
    action_type: ActionType
    params: Dict[str, Any]


@dataclass
class Observation:
    success: bool
    data: Any
    error: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class CollectedRecord:
    source: str
    entity_id: str       # 原始 ID(如 ASIN)
    fields: Dict[str, Any]
    embedding: Optional[np.ndarray] = None  # 用于实体对齐

    def fingerprint(self) -> str:
        """基于核心字段的内容指纹,用于去重"""
        key_fields = {k: v for k, v in self.fields.items()
                      if k in ["title", "brand", "price", "asin", "sku"]}
        return hashlib.md5(json.dumps(key_fields, sort_keys=True).encode()).hexdigest()


# ── 模拟工具函数(生产中替换为真实实现)────────────────────────────────────

def mock_fetch_url(url: str) -> Observation:
    """模拟 HTTP 请求,生产中替换为 httpx/playwright"""

8. 论文来源

  • 2210.03629
  • 2308.03688
  • 2401.13919
  • 2403.08299
  • 2404.11584
  • 2502.09986