paper2skills Playbook

MAS Dynamic KG Collaboration — 多智能体动态知识图谱协同:实时构建、冲突解决、协同进化

Skill-MAS-Dynamic-KG-Collaboration · 10-MAS

causalexperimentragknowledge_graphmulti_agentfraud_detectionpricing广告与投放知识图谱与RAGMAS与智能体工程定价与利润风控与合规WF-B 广告优化WF-D 选品扫描WF-F 动态定价
实现难度⭐⭐⭐☆☆
业务视角
适用角色运营负责人 / CTO · 产品经理 · CEO
适用平台Amazon PPC + 库存 + 定价 多 Agent 协作 · TikTok 内容运营流水线
什么情况下用运营任务太碎,选品/定价/广告/客服同时跑,人手严重不足;重复性运营动作需要 7×24 响应但没有足够人力
成功是什么样的多个 AI Agent 协作自动完成跨系统运营任务,运营团队人效提升 3-5 倍,7×24 无人值守运营
业务痛点
运营人手不够任务太多价格变化没有及时响应重复性工作占据太多时间想做 7×24 监控但没人盯

1. 解决的问题

`Skill-Helicase-Supply-Chain-KG-MAS` 解决的是"如何让 MAS 构建一个静态知识图谱"——一次性构建,然后查询。动态 KG 协同解决的是更难的问题:知识在持续演变,多个 Agent 同时读写 KG,如何保持 KG 的一致性、处理冲突、并让 KG 与 Agent 共同进化?

2. 核心算法逻辑

SkillHelicaseSupplyChainKGMAS 解决的是"如何让 MAS 构建一个静态知识图谱"——一次性构建,然后查询。动态 KG 协同解决的是更难的问题:知识在持续演变,多个 Agent 同时读写 KG,如何保持 KG 的一致性、处理冲突、并让 KG 与 Agent 共同进化?

3. 业务应用场景

业务背景:品牌维护一个竞品 KG(记录竞品价格、评论数量、新品发布、Amazon BSR 排名)。每天有 50+ 条新信息需要写入 KG,同时多个分析 Agent 并发读写,经常出现数据冲突(不同 Agent 报告同一产品的不同价格)。

业务背景:WF-D 选品扫描 MAS 每次评估都会产生"经验"(哪些品类值得进入、哪些合规风险高、哪些季节性强),但这些经验存在 Agent 的 context 里,下次启动后全部遗失。

4. 输入数据要求

请查看原始代码模板获取输入规格。

5. 输出结果

请查看原始代码模板获取输出规格。

6. 业务价值 / ROI

未自动抽取;请查看原始 Skill 卡片。

7. 代码模板

代码块数量:7 · 路径:未检测到

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set, Tuple
import time


@dataclass
class Triple:
    subject: str
    predicate: str
    obj: Any
    source: str = "unknown"
    confidence: float = 1.0
    timestamp: float = field(default_factory=time.time)

    def key(self) -> str:
        return f"{self.subject}|{self.predicate}"


@dataclass
class ConflictReport:
    key: str
    existing: Triple
    incoming: Triple
    conflict_type: str


class MemGraphRAG:
    """
    三层共享记忆 KG:本体层 / 事实层 / 段落层
    三 Agent 流水线:Extractor → Conflict Detector → Resolution
    """

    SOURCE_PRIORITY = {"official": 3, "media": 2, "social": 1, "unknown": 0}

    def __init__(self):
        self.ontology: Dict[str, List[str]] = {}
        self.facts: Dict[str, Triple] = {}
        self.passages: Dict[str, str] = {}

    def define_schema(self, entity_type: str, allowed_predicates: List[str]):
        self.ontology[entity_type] = allowed_predicates

    def extract_triples(self, text: str, source: str = "unknown") -> List[Triple]:
        triples = []
        for line in text.strip().split("\n"):
            parts = [p.strip() for p in line.split("|") if p.strip()]
            if len(parts) == 3:
                triples.append(Triple(parts[0], parts[1], parts[2], source=source))
        return triples

    def detect_conflicts(self, candidates: List[Triple]) -> Tuple[List[Triple], List[ConflictReport]]:
        clean, conflicts = [], []
        for t in candidates:
            existing = self.facts.get(t.key())
            if existing is None:
                clean.append(t)
            elif str(existing.obj) == str(t.obj):
                clean.append(t)
            else:
                conflict_type = (

8. 论文来源

  • 2603.20059
  • 2605.10064
  • 2606.00610