Atomix — Agent 工具调用事务性:故障注入成功率 0-7% → 37-57%
Skill-Atomix-Transactional-Tool-Calls · 16-智能体工程
1. 解决的问题
Atomix 为 Agent 工具调用引入事务语义,解决多步 Agent 工作流在故障(网络抖动、服务超时、LLM 幻觉)下产生的中间态污染问题。无事务保护时,30% 故障注入场景的成功率仅 0-7%;Atomix Tx-Full 模式将其提升至 37-57%,媲美快照回滚(CR)。
2. 核心算法逻辑
Atomix 为 Agent 工具调用引入事务语义,解决多步 Agent 工作流在故障(网络抖动、服务超时、LLM 幻觉)下产生的中间态污染问题。无事务保护时,30% 故障注入场景的成功率仅 07%;Atomix TxFull 模式将其提升至 3757%,媲美快照回滚(CR)。
3. 业务应用场景
业务问题:补货 Agent 三步工作流:① 需求预测(调用预测 API)→ ② 安全库存计算(调用库存 API)→ ③ PO 生成(调用 ERP 下单 API)。若步骤②失败但步骤③已部分执行,会产生重复 PO 下单,损失 ¥1万-10万。
Atomix 保护: - 步骤①②标记为 BUFFERABLE(纯计算,不产生外部副作用) - 步骤③标记为 EXTERNALIZED,注册补偿函数 `cancel_po(po_id)` - progress predicate:三步均成功且 frontier 推进 → commit - 步骤②失败:abort → 步骤①②无副作用自动丢弃,步骤③若已创建 PO 则调用 `cancel_po` 回滚 - 重复下单风险归零,每次误操作损失风险消除
数据要求:ERP 系统支持 PO 撤销 API;补偿函数幂等(多次调用不产生额外效果)
4. 输入数据要求
请查看原始代码模板获取输入规格。
5. 输出结果
请查看原始代码模板获取输出规格。
6. 业务价值 / ROI
10万
7. 代码模板
代码块数量:1 · 路径:未检测到
"""
Atomix: 为 Agent 工具调用引入事务语义(epoch + frontier + 补偿)
参考: arXiv:2602.14849 — Atomix: Timely, Transactional Tool Use for Reliable Agentic Workflows
GitHub: https://github.com/mpi-dsg/atomix
"""
import uuid
from enum import Enum
from typing import Any, Callable, Optional
from dataclasses import dataclass, field
# ────────────────────────────────────────
# 1. ToolEffect 枚举
# ────────────────────────────────────────
class ToolEffect(Enum):
BUFFERABLE = "bufferable" # 纯内部效果,abort 时直接丢弃
EXTERNALIZED = "externalized" # 已产生外部副作用,abort 时需补偿
# ────────────────────────────────────────
# 2. Effect 数据类
# ────────────────────────────────────────
@dataclass
class Effect:
tool_name: str
effect_type: ToolEffect
result: Any
compensation_fn: Optional[Callable] = None
epoch: int = 0
# ────────────────────────────────────────
# 3. Transaction
# ────────────────────────────────────────
@dataclass
class Transaction:
tx_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
epoch: int = 0
effects: list[Effect] = field(default_factory=list)
status: str = "active" # active | committed | aborted
# ────────────────────────────────────────
# 4. FrontierTracker
# ────────────────────────────────────────
class FrontierTracker:
def __init__(self):
self._frontiers: dict[str, int] = {} # resource_id → max confirmed epoch
def track(self, resource_id: str, epoch: int) -> None:
current = self._frontiers.get(resource_id, -1)
if epoch > current:
self._frontiers[resource_id] = epoch
def can_commit(self, tx: Transaction, required_resources: list[str]) -> bool:
for res in required_resources:
if self._frontiers.get(res, -1) < tx.epoch:
return False
return True
8. 论文来源
- 2602.14849