"Orchestration mode"(编排模式)是一个会话级开关:当它开启时,模型会对每个实质性请求投入最大程度的彻底性,先自行侦察任务,然后默认将工作扇出到并行的子智能体。当它关闭时,同一个编排工具会恢复为按请求逐次选择启用。
该模式不是一个 API 参数。它完全由已记录在文档中的组件构建而成:
xhigh)运行。不存在高于该页面所列级别的隐藏级别。system 字段始终不变,因此缓存的前缀保持完整。本示例使用对话中系统消息,该功能目前仅在 Claude Opus 4.8 上可用。扇出本身会成倍增加令牌使用量:单个请求可能会衍生出许多子智能体对话,因此请将该模式保留用于值得付出此成本的工作。
本示例是一个单文件。常量控制 effort 级别、扇出形态以及模式刷新提醒的重发频率。MAX_CONCURRENT 限制同时运行的子智能体数量(PHP 移植版本是顺序执行的,会忽略此值);MAX_TOTAL_SUBTASKS 限制模型在单次 Workflow 调用中可排队的子任务数量。将两者分开可以让模型规划大量待办任务,而不必一次性全部启动。DOC_TEST_MODE 检查会在设置该环境变量时将循环限制为单个回合,以便自动化文档测试工具可以验证文件能够编译并快速完成,而无需运行完整的编排流程;当您自己运行示例时,请保持该变量未设置。
import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-opus-4-8"
EFFORT = "xhigh"
SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."
REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"这些提醒刻意保持简短。它们负责切换模式并指向工具描述,而重量级的指令都位于工具描述中。完整文本在模式开启时发送一次,刷新提醒仅在经过几个用户回合后重新发送,退出通知在模式关闭时发送一次。
MODE_ENTER = (
"Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
"the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
"natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
"description for standing consent, granularity guidance, and quality patterns. Work solo "
"only on conversational or trivial turns."
)
MODE_REFRESH = (
"Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
"Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)Workflow 工具承载了真正的行为契约:选择启用规则、模式开启期间适用的常设许可、用于确定扇出规模的粒度指导,以及模型可以采用的质量模式(验证波次、完整性评审员、多阶段排序)。子智能体还会获得一个 report_findings 工具,以便它们的结果以结构化 JSON 而非散文形式返回,而 bash 工具是 Anthropic 定义的 bash_20250124 工具,在本地运行。
WORKFLOW_TOOL = {
"name": "Workflow",
"description": (
"Orchestrate a multi-agent workflow: split a large task into independent subtasks "
"and run them as parallel agents, then collect their results.\n\n"
"Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
"system message confirms that orchestration mode is on.\n\n"
"Quality patterns: adversarial verification (a second wave of agents checks the first "
"wave's findings against the source), a completeness critic (one agent hunts for what "
"the others missed), and multi-phase sequencing (understand, design, implement, and "
"review as separate workflow calls, reading results between phases). A useful default "
"is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
"Granularity: scope each subtask to a distinct concern, component, or question rather "
"than per line or per file section. Scale the count to what the user asked for: a "
"focused review of a module of a few hundred lines rarely needs more than about ten "
"subtasks; a broad audit of a large codebase can justify more.\n\n"
"Standing consent: while a system message confirms orchestration mode is on, that "
"opt-in is standing. Author and run a workflow for every substantive task by default, "
"and lean toward verifying findings adversarially. Work solo only on conversational "
"turns or trivial mechanical edits. When a system message says the mode is off, "
"revert to the opt-in rule above."
),
"input_schema": {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {"type": "string"},
"description": "Independent subtask prompts to run as parallel agents",
}
},
"required": ["subtasks"],
},
}
BASH_TOOL = {"type": "bash_20250124", "name": "bash"}
REPORT_TOOL = {
"name": "report_findings",
"description": (
"Report the final findings for your subtask. Call this exactly once, when you are "
"done investigating; it ends your task."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "Two or three sentences of synthesis"},
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"claim": {"type": "string", "description": "The finding, one sentence"},
"evidence": {
"type": "string",
"description": "How it was verified (file, line, or command output)",
},
"severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
},
"required": ["claim", "evidence", "severity"],
},
},
},
"required": ["summary", "findings"],
},
}bash 处理程序以超时限制运行请求的命令,捕获合并后的 stdout 和 stderr,并截断结果,以防失控的命令淹没上下文窗口。命令在您启动示例的目录中运行,因此若要将其指向某个项目,就意味着要在该项目目录中启动它;当设置了 DOC_TEST_MODE 时,测试工具会改为给 bash 提供一个小型的临时固定目录,该目录会在退出时被删除。这里没有沙箱:命令以启动示例的进程的权限运行。为清晰起见,本示例在每次调用时都在一个新的子 shell 中运行,而不是维护 bash_20250124 契约所描述的持久会话;生产环境的智能体应使用长期运行的 shell 来支持该工具,以便工作目录、环境和 restart 操作按文档所述运行。
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
fixture.write(
"def fib(n):\n"
" return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
"print(fib(10))\n"
)
else:
WORK_DIR = os.getcwd()
def run_bash(command: str) -> tuple[str, bool]:
"""Run a shell command and return (output, is_error). No sandbox: example code only."""
print(f"[bash] {command}", file=sys.stderr)
try:
proc = subprocess.run(
["bash", "-c", command],
cwd=WORK_DIR,
capture_output=True,
text=True,
errors="replace",
timeout=BASH_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
output = (proc.stdout + proc.stderr).strip() or "(no output)"
if len(output) > TOOL_RESULT_MAX_CHARS:
output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
if proc.returncode != 0:
output = f"(exit code {proc.returncode})\n{output}"
return output, proc.returncode != 0
def handle_bash_block(block) -> tuple[str, bool]:
if block.input.get("restart") is True:
return "Shell restarted.", False
command = block.input.get("command")
if not isinstance(command, str) or not command:
return "bash error: no command was provided.", True
return run_bash(command)每个工作流子任务都会成为一个独立的小型智能体循环,配备 bash 工具,并以与主循环相同的 effort 级别运行。每个请求的超时限制约束了每次 API 调用,因此连接断开只会降级单个子智能体,而不会使整个运行停滞。
def run_subagent(model: str, prompt: str) -> str:
"""One subagent: a small nested agent loop with the bash tool plus report_findings.
Subagents inherit the main loop's effort level."""
subagent_system = (
"You are one agent in a larger parallel fan-out, assigned a single subtask. "
"Investigate it directly, using bash to check facts rather than guessing, and finish "
"by calling report_findings exactly once. Return findings, not narration."
)
messages = [{"role": "user", "content": prompt}]
for _ in range(MAX_SUBAGENT_TURNS):
with client.messages.stream(
model=model,
max_tokens=64000,
system=subagent_system,
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[BASH_TOOL, REPORT_TOOL],
messages=messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
text += "\n\n(warning: subagent response was truncated at max_tokens)"
return text
tool_results = []
report = None
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "report_findings":
report = json.dumps(block.input, indent=2)
output, is_error = "Findings recorded.", False
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
if report is not None:
return report
messages.append({"role": "user", "content": tool_results})
return "(subagent hit the turn limit before finishing)"衍生出数十个子智能体的扇出从头重启成本很高。一个小型的内容寻址日志使其具有幂等性:在分派子智能体之前,在本地 JSON 文件中查找其提示的 SHA-256 哈希值,如果存在记录的结果则直接返回。中断运行后重新运行,只有从未完成的子任务才会被重新计算。该日志在多次运行之间去重,而不是在单个扇出波次内去重;删除日志文件即可重新开始。
_journal_lock = threading.Lock()
def _load_journal() -> dict:
try:
with open(JOURNAL_PATH) as file:
return json.load(file) or {}
except (OSError, json.JSONDecodeError):
return {}
def journaled(prompt: str, compute) -> str:
"""Return a cached result for this exact prompt, or compute and persist it. This
makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
that never finished are recomputed. Delete the journal file to start fresh."""
key = hashlib.sha256(prompt.encode()).hexdigest()
cached = _load_journal().get(key)
if cached is not None:
print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
return cached
result = compute()
try:
with _journal_lock: # fan-out writes from many threads
journal = _load_journal()
journal[key] = result
temp = f"{JOURNAL_PATH}.tmp"
with open(temp, "w") as file:
json.dump(journal, file)
os.replace(temp, JOURNAL_PATH) # atomic on POSIX and Windows
except OSError as error: # the journal is best-effort; never discard a computed result
print(f"[journal] write failed: {error}", file=sys.stderr)
return result扇出最多接受 MAX_TOTAL_SUBTASKS 个提示,通过日志运行它们,同时最多有 MAX_CONCURRENT 个在执行中(PHP 移植版本为顺序执行),并隔离失败,使单个出错的子智能体降级为一个错误字符串,而不是终止整个运行。第一波完成后,第二波会复用相同的子智能体路径来尝试反驳每个结果:每个验证器都从源头重新推导声明,在不确定时默认判定为已反驳。原始结果及其判定都会返回给编排器,以便它可以综合权衡。
def normalize_subtasks(raw) -> list[str]:
"""Accept the subtasks input in whatever shape the model emits: an array, the array
JSON-encoded as a single string, or a newline-separated list."""
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
raw = raw.splitlines() if "\n" in raw else [raw]
if not isinstance(raw, list):
return []
return [task.strip() for task in raw if isinstance(task, str) and task.strip()]
def verify_prompt_for(subtask: str, result: str) -> str:
return (
"Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
"claims yourself with bash rather than trusting the result, and look for evidence "
"that contradicts them. Default to refuted if uncertain. Call report_findings with "
"summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
"output that decided it.\n\n"
f"Subtask: {subtask}\n\nResult to verify:\n{result}"
)
def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
"""Run subtasks as parallel subagents, then run a second verification wave over
the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
queue; MAX_CONCURRENT bounds how many run at once."""
all_subtasks = normalize_subtasks(raw_subtasks)
subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
dropped = len(all_subtasks) - len(subtasks)
if not subtasks:
return "Workflow error: no usable subtasks were provided.", True
print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)
def run_one(prompt: str) -> str:
try:
return journaled(prompt, lambda: run_subagent(model, prompt))
except Exception as error: # isolation boundary: one bad subagent should not end the run
return f"(subagent failed: {type(error).__name__}: {error})"
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
results = list(pool.map(run_one, subtasks))
print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
verdicts = list(pool.map(run_one, verify_prompts))
joined = "\n\n".join(
f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
)
if dropped > 0:
joined = (
f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
"run; rerun them in a follow-up Workflow call)\n\n" + joined
)
return joined, False智能体首先追加用户的消息,然后追加任何到期的系统消息:退出通知、进入时的完整模式文本,或定期刷新提醒。将系统消息放在用户回合之后,可以保持其前面的每个缓存字节不受影响,并满足系统消息必须跟在用户回合之后的放置规则。
class ModeAgent:
"""An agent loop whose orchestration mode is toggled with mid-conversation system messages."""
def __init__(self, model: str, mode_on: bool = True):
self.model = model
self.mode_on = mode_on
self.messages: list[dict] = []
self._mode_announced = False
self._exit_pending = False
self._turns_since_reminder = 0
def set_mode(self, mode_on: bool) -> None:
"""Turn the mode on or off. The notice is delivered with the next user turn."""
if mode_on == self.mode_on:
return
if not mode_on:
if self._mode_announced:
self._exit_pending = True
else:
self._exit_pending = False
self.mode_on = mode_on
def _due_system_messages(self) -> list[dict]:
"""System messages owed on this turn: an exit notice, the full mode text on entry,
or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
due = []
if self._exit_pending:
self._exit_pending = False
self._mode_announced = False
due.append({"role": "system", "content": MODE_EXIT})
if self.mode_on:
if not self._mode_announced:
self._mode_announced = True
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_ENTER})
elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_REFRESH})
return due
def turn(self, user_input: str) -> str:
# Mid-conversation system messages follow the user turn they apply to, which keeps
# the cached prefix ahead of them untouched.
self.messages.append({"role": "user", "content": user_input})
self.messages.extend(self._due_system_messages())
self._turns_since_reminder += 1
for _ in range(MAX_MAIN_TURNS):
with client.messages.stream(
model=self.model,
max_tokens=64000,
system=SYSTEM_PROMPT, # static for the whole session
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[WORKFLOW_TOOL, BASH_TOOL],
messages=self.messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
self.messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
# Drop the truncated assistant message so later turns don't build on it.
self.messages.pop()
text += "\n\n(warning: response was truncated at max_tokens)"
return text
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "Workflow":
output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
self.messages.append({"role": "user", "content": tool_results})
return "(hit the main loop turn limit before finishing)"本示例中的 bash 工具会直接在您的机器上运行模型编写的命令,没有任何沙箱,而且扇出会并行运行多个此类智能体。请在您愿意暴露的目录和环境中运行它,并在将其改编用于本地实验以外的任何用途之前添加沙箱机制。
if __name__ == "__main__":
task = (
sys.argv[1]
if len(sys.argv) > 1
else "Explore the current directory, then give a thorough review: what it does, "
"code-quality issues, and concrete improvements."
)
agent = ModeAgent(MODEL)
print(agent.turn(task))
agent.set_mode(False)
print(agent.turn("Briefly summarize what you found above, no fan-out needed."))从您希望智能体工作的目录启动示例,例如要审查的代码仓库的根目录:
python orchestration_mode.py "Review this repository for flaky tests and propose fixes."当模式开启时,预期模型会先用几个 bash 命令进行侦察,主动分派 Workflow 工具,并将子智能体的报告综合成最终答案。正如提醒所指示的,琐碎或对话性的请求仍由单个智能体处理。
本示例刻意保持精简。面向真实工作负载的工具框架通常会添加:
本示例中的模式(模式提醒、工具描述中的常设许可、日志记录和验证波次)可以原封不动地沿用;只有围绕它们的执行基础设施需要变得更加健壮。
模式提醒所使用的机制,以及它如何与提示缓存交互。
API 接受的 effort 级别以及如何选择。
定义工具、处理工具调用和工具结果。
本示例在本地执行的 Anthropic 定义的 bash 工具。
Was this page helpful?