paper2skills Playbook

Cross-Org Agent Protocol — 跨组织多智能体协调协议:多委托人、联邦编排、工作区委托

Skill-Cross-Org-Agent-Protocol · 10-MAS

causalexperimentoptimizationmulti_agentpricing广告与投放供应链与补货数据采集与治理MAS与智能体工程定价与利润风控与合规WF-A 智能补货WF-B 广告优化WF-E Review监控WF-F 动态定价WF-G Listing内容优化
实现难度⭐⭐⭐⭐☆
业务视角
适用角色运营负责人 / CTO · 产品经理 · CEO
适用平台Amazon PPC + 库存 + 定价 多 Agent 协作 · TikTok 内容运营流水线
什么情况下用运营任务太碎,选品/定价/广告/客服同时跑,人手严重不足;重复性运营动作需要 7×24 响应但没有足够人力
成功是什么样的多个 AI Agent 协作自动完成跨系统运营任务,运营团队人效提升 3-5 倍,7×24 无人值守运营
业务痛点
运营人手不够任务太多价格变化没有及时响应重复性工作占据太多时间想做 7×24 监控但没人盯

1. 解决的问题

MCP(Model Context Protocol):解决 Agent 与工具/数据源的连接(Host ↔ Server)

2. 核心算法逻辑

现有 MAS 协议栈有两个标准:

3. 业务应用场景

业务背景:母婴品牌与代工工厂建立"智能协同"关系——品牌的采购 Agent 需要直接与工厂的产能规划 Agent 协作,实时获取产能数据、协商交期。但两方 Agent 系统分属不同公司,各有利益。

业务背景:品牌在 Amazon、TikTok、独立站三个平台各有独立 Agent 系统(不同供应商提供)。大促期间需要三个系统协同调配预算,但三方 Agent 来自不同厂商,无共同 API。

4. 输入数据要求

请查看原始代码模板获取输入规格。

5. 输出结果

请查看原始代码模板获取输出规格。

6. 业务价值 / ROI

未自动抽取;请查看原始 Skill 卡片。

7. 代码模板

代码块数量:5 · 路径:未检测到

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
from enum import Enum
import time
import hashlib


class MPACLayer(Enum):
    SESSION = "session"
    INTENT = "intent"
    OPERATION = "operation"
    CONFLICT = "conflict"
    GOVERNANCE = "governance"


class MPACMessageType(Enum):
    HELLO = "HELLO"
    AUTH_REQUEST = "AUTH_REQUEST"
    AUTH_GRANT = "AUTH_GRANT"
    AUTH_DENY = "AUTH_DENY"
    DECLARE_INTENT = "DECLARE_INTENT"
    ACCEPT_INTENT = "ACCEPT_INTENT"
    COUNTER_INTENT = "COUNTER_INTENT"
    REJECT_INTENT = "REJECT_INTENT"
    REQUEST = "REQUEST"
    RESPONSE = "RESPONSE"
    DISPUTE = "DISPUTE"
    ARBITRATE = "ARBITRATE"
    RESOLVE = "RESOLVE"
    LOG = "LOG"
    TERMINATE = "TERMINATE"


@dataclass
class MPACMessage:
    msg_type: MPACMessageType
    sender_id: str
    receiver_id: str
    layer: MPACLayer
    payload: Dict[str, Any]
    session_id: str
    timestamp: float = field(default_factory=time.time)
    signature: str = ""

    def sign(self, private_key_sim: str) -> "MPACMessage":
        content = f"{self.msg_type.value}{self.sender_id}{self.payload}{self.timestamp}"
        self.signature = hashlib.md5((content + private_key_sim).encode()).hexdigest()[:16]
        return self

    def verify(self, public_key_sim: str) -> bool:
        content = f"{self.msg_type.value}{self.sender_id}{self.payload}{self.timestamp}"
        expected = hashlib.md5((content + public_key_sim).encode()).hexdigest()[:16]
        return self.signature == expected


class MPACSession:
    def __init__(self, session_id: str, principal_a: str, principal_b: str):
        self.session_id = session_id
        self.principal_a = principal_a
        self.principal_b = principal_b

8. 论文来源

  • 2602.15055
  • 2604.09744