オーケストレーションモードはセッションレベルのスイッチです。オンのとき、モデルはすべての実質的なリクエストに対して最大限の徹底性を発揮し、タスク自体を偵察してから、デフォルトで並列サブエージェントに作業を「fan-out」(ファンアウト)します。オフのとき、同じオーケストレーションツールはリクエストごとのオプトインに戻ります。
このモードはAPIパラメータではありません。ドキュメント化された要素のみから構築されています。
xhigh などのドキュメント化された Effort 値で実行されます。そのページに記載されているもの以上の隠しレベルはありません。system フィールドは変更されないため、キャッシュされたプレフィックスはそのまま維持されます。この例では会話途中のシステムメッセージを使用しており、これは現在 Claude Opus 4.8 でのみ利用可能です。ファンアウト自体がトークン使用量を倍増させます。1つのリクエストが多数のサブエージェント会話を生成する可能性があるため、そのコストに見合う作業にのみこのモードを使用してください。
この例は単一ファイルです。定数はエフォートレベル、ファンアウトの形状、モードリフレッシャーを再送信する頻度を制御します。MAX_CONCURRENT は同時に実行されるサブエージェントの数を制限します(PHP版は逐次実行のためこれを無視します)。MAX_TOTAL_SUBTASKS は、モデルが1回のWorkflow呼び出しでキューに入れられるサブタスクの数を制限します。この2つを分けることで、モデルは大きなバックログを一度にすべて起動することなく計画できます。DOC_TEST_MODE チェックは、その環境変数が設定されている場合にループを1ターンに制限します。これにより、自動ドキュメントハーネスがオーケストレーション全体を実行せずにファイルがコンパイルされ迅速に終了することを検証できます。自分で例を実行する場合は未設定のままにしてください。
import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-opus-4-8"
EFFORT = "xhigh"
SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."
REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"リマインダーは意図的に短くしています。モードを切り替え、重要な指示が記載されているツール説明を指し示すだけです。モードがオンになったときに全文が1回送信され、リフレッシャーは数回のユーザーターンの後にのみ再送信され、モードがオフになったときに終了通知が1回送信されます。
MODE_ENTER = (
"Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
"the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
"natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
"description for standing consent, granularity guidance, and quality patterns. Work solo "
"only on conversational or trivial turns."
)
MODE_REFRESH = (
"Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
"Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)Workflowツールが実際の動作契約を担います。オプトインルール、モードがオンの間に適用される継続的な同意、ファンアウトのサイズを決めるための粒度ガイダンス、モデルが活用できる品質パターン(検証ウェーブ、完全性クリティック、マルチフェーズシーケンシング)などです。サブエージェントには report_findings ツールも与えられ、結果が散文ではなく構造化されたJSONとして返されます。bashツールは、ローカルで実行されるAnthropic定義の bash_20250124 ツールです。
WORKFLOW_TOOL = {
"name": "Workflow",
"description": (
"Orchestrate a multi-agent workflow: split a large task into independent subtasks "
"and run them as parallel agents, then collect their results.\n\n"
"Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
"system message confirms that orchestration mode is on.\n\n"
"Quality patterns: adversarial verification (a second wave of agents checks the first "
"wave's findings against the source), a completeness critic (one agent hunts for what "
"the others missed), and multi-phase sequencing (understand, design, implement, and "
"review as separate workflow calls, reading results between phases). A useful default "
"is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
"Granularity: scope each subtask to a distinct concern, component, or question rather "
"than per line or per file section. Scale the count to what the user asked for: a "
"focused review of a module of a few hundred lines rarely needs more than about ten "
"subtasks; a broad audit of a large codebase can justify more.\n\n"
"Standing consent: while a system message confirms orchestration mode is on, that "
"opt-in is standing. Author and run a workflow for every substantive task by default, "
"and lean toward verifying findings adversarially. Work solo only on conversational "
"turns or trivial mechanical edits. When a system message says the mode is off, "
"revert to the opt-in rule above."
),
"input_schema": {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {"type": "string"},
"description": "Independent subtask prompts to run as parallel agents",
}
},
"required": ["subtasks"],
},
}
BASH_TOOL = {"type": "bash_20250124", "name": "bash"}
REPORT_TOOL = {
"name": "report_findings",
"description": (
"Report the final findings for your subtask. Call this exactly once, when you are "
"done investigating; it ends your task."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "Two or three sentences of synthesis"},
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"claim": {"type": "string", "description": "The finding, one sentence"},
"evidence": {
"type": "string",
"description": "How it was verified (file, line, or command output)",
},
"severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
},
"required": ["claim", "evidence", "severity"],
},
},
},
"required": ["summary", "findings"],
},
}bashハンドラーは、要求されたコマンドをタイムアウト付きで実行し、stdoutとstderrを結合してキャプチャし、暴走したコマンドがコンテキストウィンドウを溢れさせないように結果を切り詰めます。コマンドは例を起動したディレクトリで実行されるため、プロジェクトを対象にする場合はそこから起動してください。DOC_TEST_MODE が設定されている場合、ハーネスは代わりに小さな使い捨てのフィクスチャディレクトリをbashに与え、終了時に削除します。ここにはサンドボックスはありません。コマンドは例を起動したプロセスの権限で実行されます。わかりやすさのため、この例では bash_20250124 契約が記述する永続セッションを維持するのではなく、各呼び出しを新しいサブシェルで実行します。本番エージェントでは、作業ディレクトリ、環境、restart アクションがドキュメントどおりに動作するように、長寿命のシェルでツールをバックアップすべきです。
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
fixture.write(
"def fib(n):\n"
" return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
"print(fib(10))\n"
)
else:
WORK_DIR = os.getcwd()
def run_bash(command: str) -> tuple[str, bool]:
"""Run a shell command and return (output, is_error). No sandbox: example code only."""
print(f"[bash] {command}", file=sys.stderr)
try:
proc = subprocess.run(
["bash", "-c", command],
cwd=WORK_DIR,
capture_output=True,
text=True,
errors="replace",
timeout=BASH_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
output = (proc.stdout + proc.stderr).strip() or "(no output)"
if len(output) > TOOL_RESULT_MAX_CHARS:
output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
if proc.returncode != 0:
output = f"(exit code {proc.returncode})\n{output}"
return output, proc.returncode != 0
def handle_bash_block(block) -> tuple[str, bool]:
if block.input.get("restart") is True:
return "Shell restarted.", False
command = block.input.get("command")
if not isinstance(command, str) or not command:
return "bash error: no command was provided.", True
return run_bash(command)各ワークフローサブタスクは、bashツールを持つ独自の小さなエージェントループになり、メインループと同じエフォートで実行されます。リクエストごとのタイムアウトが各API呼び出しを制限するため、接続が切断されても実行全体が停止するのではなく、1つのサブエージェントが劣化するだけで済みます。
def run_subagent(model: str, prompt: str) -> str:
"""One subagent: a small nested agent loop with the bash tool plus report_findings.
Subagents inherit the main loop's effort level."""
subagent_system = (
"You are one agent in a larger parallel fan-out, assigned a single subtask. "
"Investigate it directly, using bash to check facts rather than guessing, and finish "
"by calling report_findings exactly once. Return findings, not narration."
)
messages = [{"role": "user", "content": prompt}]
for _ in range(MAX_SUBAGENT_TURNS):
with client.messages.stream(
model=model,
max_tokens=64000,
system=subagent_system,
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[BASH_TOOL, REPORT_TOOL],
messages=messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
text += "\n\n(warning: subagent response was truncated at max_tokens)"
return text
tool_results = []
report = None
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "report_findings":
report = json.dumps(block.input, indent=2)
output, is_error = "Findings recorded.", False
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
if report is not None:
return report
messages.append({"role": "user", "content": tool_results})
return "(subagent hit the turn limit before finishing)"数十のサブエージェントを生成するファンアウトは、ゼロから再起動するとコストがかかります。小さなコンテンツアドレス型ジャーナルがこれを冪等にします。サブエージェントをディスパッチする前に、そのプロンプトのSHA-256をローカルJSONファイルで検索し、記録された結果が存在すればそれを返します。実行を中断して再実行すると、完了しなかったサブタスクのみが再計算されます。ジャーナルは実行間で重複排除しますが、単一のファンアウトウェーブ内では行いません。最初からやり直すにはジャーナルファイルを削除してください。
_journal_lock = threading.Lock()
def _load_journal() -> dict:
try:
with open(JOURNAL_PATH) as file:
return json.load(file) or {}
except (OSError, json.JSONDecodeError):
return {}
def journaled(prompt: str, compute) -> str:
"""Return a cached result for this exact prompt, or compute and persist it. This
makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
that never finished are recomputed. Delete the journal file to start fresh."""
key = hashlib.sha256(prompt.encode()).hexdigest()
cached = _load_journal().get(key)
if cached is not None:
print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
return cached
result = compute()
try:
with _journal_lock: # fan-out writes from many threads
journal = _load_journal()
journal[key] = result
temp = f"{JOURNAL_PATH}.tmp"
with open(temp, "w") as file:
json.dump(journal, file)
os.replace(temp, JOURNAL_PATH) # atomic on POSIX and Windows
except OSError as error: # the journal is best-effort; never discard a computed result
print(f"[journal] write failed: {error}", file=sys.stderr)
return resultファンアウトは最大 MAX_TOTAL_SUBTASKS 個のプロンプトを受け入れ、最大 MAX_CONCURRENT 個を同時実行しながらジャーナルを通して実行し(PHP版では逐次実行)、失敗を分離して、1つの壊れたサブエージェントが実行を終了させるのではなくエラー文字列に劣化するようにします。最初のウェーブが終了すると、2番目のウェーブが同じサブエージェントパスを再利用して各結果の反証を試みます。すべての検証者がソースから主張を再導出し、不確実な場合はデフォルトで反証済みとします。元の結果とその判定の両方がオーケストレーターに返され、オーケストレーターはそれらを総合的に評価できます。
def normalize_subtasks(raw) -> list[str]:
"""Accept the subtasks input in whatever shape the model emits: an array, the array
JSON-encoded as a single string, or a newline-separated list."""
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
raw = raw.splitlines() if "\n" in raw else [raw]
if not isinstance(raw, list):
return []
return [task.strip() for task in raw if isinstance(task, str) and task.strip()]
def verify_prompt_for(subtask: str, result: str) -> str:
return (
"Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
"claims yourself with bash rather than trusting the result, and look for evidence "
"that contradicts them. Default to refuted if uncertain. Call report_findings with "
"summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
"output that decided it.\n\n"
f"Subtask: {subtask}\n\nResult to verify:\n{result}"
)
def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
"""Run subtasks as parallel subagents, then run a second verification wave over
the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
queue; MAX_CONCURRENT bounds how many run at once."""
all_subtasks = normalize_subtasks(raw_subtasks)
subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
dropped = len(all_subtasks) - len(subtasks)
if not subtasks:
return "Workflow error: no usable subtasks were provided.", True
print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)
def run_one(prompt: str) -> str:
try:
return journaled(prompt, lambda: run_subagent(model, prompt))
except Exception as error: # isolation boundary: one bad subagent should not end the run
return f"(subagent failed: {type(error).__name__}: {error})"
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
results = list(pool.map(run_one, subtasks))
print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
verdicts = list(pool.map(run_one, verify_prompts))
joined = "\n\n".join(
f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
)
if dropped > 0:
joined = (
f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
"run; rerun them in a follow-up Workflow call)\n\n" + joined
)
return joined, Falseエージェントは最初にユーザーのメッセージを追加し、次に送信すべきシステムメッセージ(終了通知、モード開始時の全文、または定期的なリフレッシャー)を追加します。システムメッセージをユーザーターンの後に配置することで、その前にあるキャッシュされたすべてのバイトが変更されずに保たれ、システムメッセージがユーザーターンの後に続くという配置ルールを満たします。
class ModeAgent:
"""An agent loop whose orchestration mode is toggled with mid-conversation system messages."""
def __init__(self, model: str, mode_on: bool = True):
self.model = model
self.mode_on = mode_on
self.messages: list[dict] = []
self._mode_announced = False
self._exit_pending = False
self._turns_since_reminder = 0
def set_mode(self, mode_on: bool) -> None:
"""Turn the mode on or off. The notice is delivered with the next user turn."""
if mode_on == self.mode_on:
return
if not mode_on:
if self._mode_announced:
self._exit_pending = True
else:
self._exit_pending = False
self.mode_on = mode_on
def _due_system_messages(self) -> list[dict]:
"""System messages owed on this turn: an exit notice, the full mode text on entry,
or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
due = []
if self._exit_pending:
self._exit_pending = False
self._mode_announced = False
due.append({"role": "system", "content": MODE_EXIT})
if self.mode_on:
if not self._mode_announced:
self._mode_announced = True
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_ENTER})
elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_REFRESH})
return due
def turn(self, user_input: str) -> str:
# Mid-conversation system messages follow the user turn they apply to, which keeps
# the cached prefix ahead of them untouched.
self.messages.append({"role": "user", "content": user_input})
self.messages.extend(self._due_system_messages())
self._turns_since_reminder += 1
for _ in range(MAX_MAIN_TURNS):
with client.messages.stream(
model=self.model,
max_tokens=64000,
system=SYSTEM_PROMPT, # static for the whole session
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[WORKFLOW_TOOL, BASH_TOOL],
messages=self.messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
self.messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
# Drop the truncated assistant message so later turns don't build on it.
self.messages.pop()
text += "\n\n(warning: response was truncated at max_tokens)"
return text
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "Workflow":
output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
self.messages.append({"role": "user", "content": tool_results})
return "(hit the main loop turn limit before finishing)"この例のbashツールは、モデルが書いたコマンドをサンドボックスなしでマシン上で直接実行し、ファンアウトはそれらのエージェントを複数並列で実行します。公開しても問題ないディレクトリと環境で実行し、ローカルでの実験以外の用途に適応させる前にサンドボックス化を追加してください。
if __name__ == "__main__":
task = (
sys.argv[1]
if len(sys.argv) > 1
else "Explore the current directory, then give a thorough review: what it does, "
"code-quality issues, and concrete improvements."
)
agent = ModeAgent(MODEL)
print(agent.turn(task))
agent.set_mode(False)
print(agent.turn("Briefly summarize what you found above, no fan-out needed."))エージェントに作業させたいディレクトリ(たとえばレビュー対象のリポジトリのルート)から例を起動します。
python orchestration_mode.py "Review this repository for flaky tests and propose fixes."モードがオンの場合、モデルはいくつかのbashコマンドで偵察し、指示なしでWorkflowツールをディスパッチし、サブエージェントのレポートを最終回答に統合することが期待されます。些細なリクエストや会話的なリクエストは、リマインダーの指示どおり単独で処理されます。
この例は意図的に小さくしています。実際のワークロード向けのハーネスには、通常以下を追加します。
この例のパターン(モードリマインダー、ツール説明内の継続的な同意、ジャーナリング、検証ウェーブ)はそのまま引き継がれます。より堅牢になるのは、それらを取り巻く実行基盤のみです。
モードリマインダーが使用するメカニズムと、プロンプトキャッシングとの相互作用について。
APIが受け入れるエフォートレベルとその選び方。
ツールの定義、ツール呼び出しの処理、ツール結果について。
この例がローカルで実行するAnthropic定義のbashツール。
Was this page helpful?