Was this page helpful?
「Orchestration mode」(協調模式)是一個工作階段層級的開關:當它開啟時,模型會對每個實質性請求投入最大程度的徹底性,先自行偵察任務,然後預設將工作扇出給平行的子代理。當它關閉時,同一個協調工具會回到逐請求選擇加入的方式。
此模式並非 API 參數。它完全由已記載的元件組成:
xhigh。該頁面所列等級之上沒有任何隱藏等級。system 欄位從不變更,因此快取的前綴保持完整。此範例使用對話中系統訊息,目前僅在 Claude Opus 4.8 上可用。扇出本身會倍增 token 使用量:單一請求可能衍生許多子代理對話,因此請將此模式保留給值得付出該成本的工作。
此範例是單一檔案。常數控制努力等級、扇出形態,以及模式複習提醒重新發送的頻率。MAX_CONCURRENT 限制同時執行的子代理數量(PHP 移植版本為循序執行,會忽略此設定);MAX_TOTAL_SUBTASKS 限制模型在單次 Workflow 呼叫中可排入佇列的數量。將兩者分開可讓模型規劃大量待辦項目,而不必一次全部啟動。DOC_TEST_MODE 檢查會在設定該環境變數時將迴圈限制為單一回合,以便自動化文件測試工具能驗證檔案可編譯並快速完成,而無需執行完整的協調流程;自行執行範例時請勿設定此變數。
import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-opus-4-8"
EFFORT = "xhigh"
SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."
REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"這些提醒刻意保持簡短。它們切換模式並指向工具描述,重量級的指示都放在那裡。完整文字在模式開啟時發送一次,複習提醒僅在數個使用者回合後重新發送,退出通知則在模式關閉時發送一次。
Workflow 工具承載真正的行為契約:選擇加入規則、模式開啟期間適用的常設同意、調整扇出規模的粒度指引,以及模型可採用的品質模式(驗證波次、完整性評審、多階段排序)。子代理也會獲得一個 report_findings 工具,使其結果以結構化 JSON 而非散文形式回傳,而 bash 工具則是在本機執行的 Anthropic 定義的 bash_20250124 工具。
bash 處理器會以逾時限制執行所請求的命令,擷取合併的 stdout 和 stderr,並截斷結果,以免失控的命令淹沒上下文視窗。命令會在您啟動範例的目錄中執行,因此若要將其指向某個專案,就表示要從該處啟動;當設定了 DOC_TEST_MODE 時,測試工具會改為提供一個小型的臨時測試目錄給 bash,並在結束時移除。此處沒有沙箱:命令會以啟動範例之程序的權限執行。為求清晰,此範例在全新的子 shell 中執行每次呼叫,而非維護 bash_20250124 契約所描述的持久工作階段;正式環境的代理應以長期執行的 shell 支援此工具,使工作目錄、環境和 restart 動作的行為符合文件所述。
每個工作流程子任務都會成為自己的小型代理迴圈,配備 bash 工具,並以與主迴圈相同的努力等級執行。每個請求的逾時限制會約束每次 API 呼叫,因此連線中斷只會降級單一子代理,而不會使整個執行停滯。
衍生數十個子代理的扇出若要從頭重新啟動,成本相當高昂。一個小型的內容定址日誌可使其具有冪等性:在派遣子代理之前,先在本機 JSON 檔案中查詢其提示的 SHA-256,若已有記錄的結果則直接回傳。中斷執行後重新執行,只有從未完成的子任務會被重新計算。此日誌會跨執行去重,而非在單一扇出波次內去重;刪除日誌檔案即可重新開始。
扇出最多接受 MAX_TOTAL_SUBTASKS 個提示,透過日誌執行它們,同時進行中的數量最多為 MAX_CONCURRENT(PHP 移植版本為循序執行),並隔離失敗,使單一故障的子代理降級為錯誤字串,而非結束整個執行。第一波完成後,第二波會重複使用相同的子代理路徑,嘗試反駁每個結果:每個驗證器都會從來源重新推導主張,不確定時預設為已反駁。原始結果及其判定都會回傳給協調器,以便一併權衡。
代理會先附加使用者的訊息,然後附加任何到期的系統訊息:退出通知、進入時的完整模式文字,或定期的複習提醒。將系統訊息放在使用者回合之後,可使其前方的每個快取位元組保持不變,並滿足系統訊息須跟隨使用者回合的放置規則。
此範例中的 bash 工具會直接在您的機器上執行模型編寫的命令,沒有任何沙箱,且扇出會平行執行數個此類代理。請在您願意暴露的目錄和環境中執行,並在將其改編用於本機實驗以外的任何用途之前,先加入沙箱機制。
從您希望代理工作的目錄啟動範例,例如要審查的儲存庫根目錄:
python orchestration_mode.py "Review this repository for flaky tests and propose fixes."模式開啟時,預期模型會先用幾個 bash 命令進行偵察,主動派遣 Workflow 工具,並將子代理報告綜合成最終答案。瑣碎或對話性的請求則保持單獨處理,如提醒所指示。
此範例刻意保持精簡。針對實際工作負載的執行框架通常會加入:
此範例中的模式(模式提醒、工具描述中的常設同意、日誌記錄,以及驗證波次)可原封不動地沿用;只有圍繞它們的執行基礎架構會變得更加穩健。
MODE_ENTER = (
"Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
"the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
"natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
"description for standing consent, granularity guidance, and quality patterns. Work solo "
"only on conversational or trivial turns."
)
MODE_REFRESH = (
"Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
"Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)WORKFLOW_TOOL = {
"name": "Workflow",
"description": (
"Orchestrate a multi-agent workflow: split a large task into independent subtasks "
"and run them as parallel agents, then collect their results.\n\n"
"Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
"system message confirms that orchestration mode is on.\n\n"
"Quality patterns: adversarial verification (a second wave of agents checks the first "
"wave's findings against the source), a completeness critic (one agent hunts for what "
"the others missed), and multi-phase sequencing (understand, design, implement, and "
"review as separate workflow calls, reading results between phases). A useful default "
"is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
"Granularity: scope each subtask to a distinct concern, component, or question rather "
"than per line or per file section. Scale the count to what the user asked for: a "
"focused review of a module of a few hundred lines rarely needs more than about ten "
"subtasks; a broad audit of a large codebase can justify more.\n\n"
"Standing consent: while a system message confirms orchestration mode is on, that "
"opt-in is standing. Author and run a workflow for every substantive task by default, "
"and lean toward verifying findings adversarially. Work solo only on conversational "
"turns or trivial mechanical edits. When a system message says the mode is off, "
"revert to the opt-in rule above."
),
"input_schema": {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {"type": "string"},
"description": "Independent subtask prompts to run as parallel agents",
}
},
"required": ["subtasks"],
},
}
BASH_TOOL = {"type": "bash_20250124", "name": "bash"}
REPORT_TOOL = {
"name": "report_findings",
"description": (
"Report the final findings for your subtask. Call this exactly once, when you are "
"done investigating; it ends your task."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "Two or three sentences of synthesis"},
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"claim": {"type": "string", "description": "The finding, one sentence"},
"evidence": {
"type": "string",
"description": "How it was verified (file, line, or command output)",
},
"severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
},
"required": ["claim", "evidence", "severity"],
},
},
},
"required": ["summary", "findings"],
},
}# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
fixture.write(
"def fib(n):\n"
" return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
"print(fib(10))\n"
)
else:
WORK_DIR = os.getcwd()
def run_bash(command: str) -> tuple[str, bool]:
"""Run a shell command and return (output, is_error). No sandbox: example code only."""
print(f"[bash] {command}", file=sys.stderr)
try:
proc = subprocess.run(
["bash", "-c", command],
cwd=WORK_DIR,
capture_output=True,
text=True,
errors="replace",
timeout=BASH_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
output = (proc.stdout + proc.stderr).strip() or "(no output)"
if len(output) > TOOL_RESULT_MAX_CHARS:
output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
if proc.returncode != 0:
output = f"(exit code {proc.returncode})\n{output}"
return output, proc.returncode != 0
def handle_bash_block(block) -> tuple[str, bool]:
if block.input.get("restart") is True:
return "Shell restarted.", False
command = block.input.get("command")
if not isinstance(command, str) or not command:
return "bash error: no command was provided.", True
return run_bash(command)def run_subagent(model: str, prompt: str) -> str:
"""One subagent: a small nested agent loop with the bash tool plus report_findings.
Subagents inherit the main loop's effort level."""
subagent_system = (
"You are one agent in a larger parallel fan-out, assigned a single subtask. "
"Investigate it directly, using bash to check facts rather than guessing, and finish "
"by calling report_findings exactly once. Return findings, not narration."
)
messages = [{"role": "user", "content": prompt}]
for _ in range(MAX_SUBAGENT_TURNS):
with client.messages.stream(
model=model,
max_tokens=64000,
system=subagent_system,
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[BASH_TOOL, REPORT_TOOL],
messages=messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
text += "\n\n(warning: subagent response was truncated at max_tokens)"
return text
tool_results = []
report = None
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "report_findings":
report = json.dumps(block.input, indent=2)
output, is_error = "Findings recorded.", False
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
if report is not None:
return report
messages.append({"role": "user", "content": tool_results})
return "(subagent hit the turn limit before finishing)"_journal_lock = threading.Lock()
def _load_journal() -> dict:
try:
with open(JOURNAL_PATH) as file:
return json.load(file) or {}
except (OSError, json.JSONDecodeError):
return {}
def journaled(prompt: str, compute) -> str:
"""Return a cached result for this exact prompt, or compute and persist it. This
makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
that never finished are recomputed. Delete the journal file to start fresh."""
key = hashlib.sha256(prompt.encode()).hexdigest()
cached = _load_journal().get(key)
if cached is not None:
print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
return cached
result = compute()
try:
with _journal_lock: # fan-out writes from many threads
journal = _load_journal()
journal[key] = result
temp = f"{JOURNAL_PATH}.tmp"
with open(temp, "w") as file:
json.dump(journal, file)
os.replace(temp, JOURNAL_PATH) # atomic on POSIX and Windows
except OSError as error: # the journal is best-effort; never discard a computed result
print(f"[journal] write failed: {error}", file=sys.stderr)
return resultdef normalize_subtasks(raw) -> list[str]:
"""Accept the subtasks input in whatever shape the model emits: an array, the array
JSON-encoded as a single string, or a newline-separated list."""
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
raw = raw.splitlines() if "\n" in raw else [raw]
if not isinstance(raw, list):
return []
return [task.strip() for task in raw if isinstance(task, str) and task.strip()]
def verify_prompt_for(subtask: str, result: str) -> str:
return (
"Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
"claims yourself with bash rather than trusting the result, and look for evidence "
"that contradicts them. Default to refuted if uncertain. Call report_findings with "
"summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
"output that decided it.\n\n"
f"Subtask: {subtask}\n\nResult to verify:\n{result}"
)
def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
"""Run subtasks as parallel subagents, then run a second verification wave over
the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
queue; MAX_CONCURRENT bounds how many run at once."""
all_subtasks = normalize_subtasks(raw_subtasks)
subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
dropped = len(all_subtasks) - len(subtasks)
if not subtasks:
return "Workflow error: no usable subtasks were provided.", True
print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)
def run_one(prompt: str) -> str:
try:
return journaled(prompt, lambda: run_subagent(model, prompt))
except Exception as error: # isolation boundary: one bad subagent should not end the run
return f"(subagent failed: {type(error).__name__}: {error})"
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
results = list(pool.map(run_one, subtasks))
print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
verdicts = list(pool.map(run_one, verify_prompts))
joined = "\n\n".join(
f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
)
if dropped > 0:
joined = (
f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
"run; rerun them in a follow-up Workflow call)\n\n" + joined
)
return joined, Falseclass ModeAgent:
"""An agent loop whose orchestration mode is toggled with mid-conversation system messages."""
def __init__(self, model: str, mode_on: bool = True):
self.model = model
self.mode_on = mode_on
self.messages: list[dict] = []
self._mode_announced = False
self._exit_pending = False
self._turns_since_reminder = 0
def set_mode(self, mode_on: bool) -> None:
"""Turn the mode on or off. The notice is delivered with the next user turn."""
if mode_on == self.mode_on:
return
if not mode_on:
if self._mode_announced:
self._exit_pending = True
else:
self._exit_pending = False
self.mode_on = mode_on
def _due_system_messages(self) -> list[dict]:
"""System messages owed on this turn: an exit notice, the full mode text on entry,
or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
due = []
if self._exit_pending:
self._exit_pending = False
self._mode_announced = False
due.append({"role": "system", "content": MODE_EXIT})
if self.mode_on:
if not self._mode_announced:
self._mode_announced = True
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_ENTER})
elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_REFRESH})
return due
def turn(self, user_input: str) -> str:
# Mid-conversation system messages follow the user turn they apply to, which keeps
# the cached prefix ahead of them untouched.
self.messages.append({"role": "user", "content": user_input})
self.messages.extend(self._due_system_messages())
self._turns_since_reminder += 1
for _ in range(MAX_MAIN_TURNS):
with client.messages.stream(
model=self.model,
max_tokens=64000,
system=SYSTEM_PROMPT, # static for the whole session
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[WORKFLOW_TOOL, BASH_TOOL],
messages=self.messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
self.messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
# Drop the truncated assistant message so later turns don't build on it.
self.messages.pop()
text += "\n\n(warning: response was truncated at max_tokens)"
return text
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "Workflow":
output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
self.messages.append({"role": "user", "content": tool_results})
return "(hit the main loop turn limit before finishing)"if __name__ == "__main__":
task = (
sys.argv[1]
if len(sys.argv) > 1
else "Explore the current directory, then give a thorough review: what it does, "
"code-quality issues, and concrete improvements."
)
agent = ModeAgent(MODEL)
print(agent.turn(task))
agent.set_mode(False)
print(agent.turn("Briefly summarize what you found above, no fan-out needed."))定義工具、處理工具呼叫和工具結果。