TDP — DAG 任务解耦规划:82% Token 节省 + 错误隔离
Skill-DAG-Task-Decomposition-Planning · 16-智能体工程
causalexperimentforecastingmulti_agentvisual_generation广告与投放供应链与补货推荐与搜索MAS与智能体工程风控与合规视觉内容生成WF-A 智能补货WF-B 广告优化WF-G Listing内容优化
实现难度⭐⭐☆☆☆
业务优先级⭐⭐⭐⭐⭐
业务视角
适用角色CTO / 技术负责人 · 产品经理 · 数据工程师
适用平台跨境运营 AI Agent 工程落地 · Amazon SP API + LLM 集成 · 多平台数据采集 Agent
什么情况下用想把 AI 集成到业务系统,但 LLM 稳定性差、幻觉问题、成本控制都是挑战;Agent 任务失败了不知道哪步出了问题
成功是什么样的AI Agent 在生产环境稳定运行,失败可追踪,成本可控,复杂任务完成率 >85%
业务痛点
1. 解决的问题
传统 LLM Agent 在执行复杂任务时,把所有历史消息塞入 context window("full history" 模式),导致两个问题
2. 核心算法逻辑
传统 LLM Agent 在执行复杂任务时,把所有历史消息塞入 context window("full history" 模式),导致两个问题:
3. 业务应用场景
业务场景:Amazon FBA 补货决策,需要综合需求预测、安全库存、MOQ 约束三路计算。
痛点:顺序执行时,需求预测步骤的输出作为文本传入安全库存计算,一旦预测值描述模糊("约 500 件"),安全库存 Agent 可能基于错误理解做出偏差计算,最终补货单出现雪崩错误。
效果: - 每节点 context 平均 ~200 tokens(vs 全历史 ~1,500 tokens)= token -87% - MOQ 验证失败时,仅重算 moq_validation 节点,其他节点结果缓存复用 - 错误不传播:需求预测临时失败 → 整个补货 DAG 暂停,不产生错误采购单
4. 输入数据要求
请查看原始代码模板获取输入规格。
5. 输出结果
请查看原始代码模板获取输出规格。
6. 业务价值 / ROI
- Claude API 调用成本:假设月均 100 次上架流程 × $0.05/次 → TDP 后 ~$0.009/次,月省 ~$4,100
- 上架错误重跑:平均每次全流程重跑 ~15min → TDP 局部重算 ~3min,运营效率 +80%
- 错误雪崩导致的错误采购单:每次损失估算 $2,000-8,000 → TDP 完全规避
7. 代码模板
代码块数量:4 · 路径:未检测到
"""
TDP: Task Decoupled Planning for LLM Agents
DAG 任务解耦规划 — 82% Token 节省 + 错误隔离
论文: arXiv 2601.07577 | 2026年1月
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from collections import deque
import copy
# ─── 数据结构 ────────────────────────────────────────────────────────────────
@dataclass
class DAGNode:
"""DAG 任务节点:每节点有独立的 scoped context"""
node_id: str
task_desc: str
dependencies: list[str] = field(default_factory=list) # 上游节点 ID
scoped_context: dict[str, Any] = field(default_factory=dict) # 仅包含本节点所需输入
output: Optional[Any] = None
status: str = "pending" # pending / running / done / failed
# ─── DAG 图结构 ──────────────────────────────────────────────────────────────
class TaskDAG:
"""有向无环图任务结构"""
def __init__(self):
self._nodes: dict[str, DAGNode] = {}
def add_node(self, node: DAGNode) -> None:
self._nodes[node.node_id] = node
def get_node(self, node_id: str) -> Optional[DAGNode]:
return self._nodes.get(node_id)
def topological_sort(self) -> list[str]:
"""Kahn 算法拓扑排序"""
in_degree = {nid: 0 for nid in self._nodes}
for node in self._nodes.values():
for dep in node.dependencies:
in_degree[node.node_id] += 1
queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
order = []
while queue:
nid = queue.popleft()
order.append(nid)
for node in self._nodes.values():
if nid in node.dependencies:
in_degree[node.node_id] -= 1
if in_degree[node.node_id] == 0:
queue.append(node.node_id)
if len(order) != len(self._nodes):
8. 论文来源
- 2601.07577