paper2skills Playbook

MAS Resource Scheduling — OS 调度原语驱动的多智能体资源管理

Skill-MAS-Resource-Scheduling · 10-MAS

causalexperimentforecastingoptimizationknowledge_graphmulti_agentpricing供应链与补货知识图谱与RAGMAS与智能体工程定价与利润风控与合规WF-A 智能补货WF-D 选品扫描WF-F 动态定价
年化 ROI12 万元
实现难度⭐⭐☆☆☆
业务视角
适用角色运营负责人 / CTO · 产品经理 · CEO
适用平台Amazon PPC + 库存 + 定价 多 Agent 协作 · TikTok 内容运营流水线
什么情况下用运营任务太碎,选品/定价/广告/客服同时跑,人手严重不足;重复性运营动作需要 7×24 响应但没有足够人力
成功是什么样的多个 AI Agent 协作自动完成跨系统运营任务,运营团队人效提升 3-5 倍,7×24 无人值守运营
业务痛点
运营人手不够任务太多价格变化没有及时响应重复性工作占据太多时间想做 7×24 监控但没人盯

1. 解决的问题

MAS 生产化最常见的失败不来自 Agent 逻辑,而来自资源竞争:多个 Agent 并行调用同一个限速 API,导致连接重置、HTTP 502、上下文泄漏、Zombie Agent 挂起。这些问题在操作系统领域早已有成熟解法——HiveMind 和 AgentRM 把 OS 调度理论直接搬到 MAS 层。

2. 核心算法逻辑

MAS 生产化最常见的失败不来自 Agent 逻辑,而来自资源竞争:多个 Agent 并行调用同一个限速 API,导致连接重置、HTTP 502、上下文泄漏、Zombie Agent 挂起。这些问题在操作系统领域早已有成熟解法——HiveMind 和 AgentRM 把 OS 调度理论直接搬到 MAS 层。

3. 业务应用场景

业务背景:双 11 大促前,需要同时启动 20 个选品扫描 Agent 评估候选 SKU。所有 Agent 共享同一个 GPT-4o API 配额(TPM 限制),历史上每次大促前都有 30-40% 的 Agent 因 429 错误失败,需要人工重跑。

业务背景:AIM-RM 库存 MAS 每天运行 500+ 次库存决策,运营反馈"有时候任务卡住不动"(Zombie Agent)。

4. 输入数据要求

请查看原始代码模板获取输入规格。

5. 输出结果

请查看原始代码模板获取输出规格。

6. 业务价值 / ROI

12 万元

7. 代码模板

代码块数量:7 · 路径:未检测到

import time
import threading
import queue
from collections import deque
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from enum import Enum


class Priority(Enum):
    CRITICAL = 0
    NORMAL = 1
    BACKGROUND = 2


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


@dataclass
class AgentTask:
    task_id: str
    agent_fn: Callable
    priority: Priority = Priority.NORMAL
    budget_tokens: int = 4000
    deadline_seconds: float = 60.0
    created_at: float = field(default_factory=time.time)
    mlfq_level: int = 0


@dataclass
class ContextLayer:
    session: Dict[str, Any] = field(default_factory=dict)
    task: Dict[str, Any] = field(default_factory=dict)
    agent: Dict[str, Any] = field(default_factory=dict)

    def clear_task(self):
        self.task.clear()

    def clear_agent(self):
        self.agent.clear()


class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, cooldown: float = 30.0):
        self.failure_threshold = failure_threshold
        self.cooldown = cooldown
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0.0

    def record_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

8. 论文来源

  • 2603.13110
  • 2604.17111
  • 2605.06110