Was this page helpful?
Ein Orchestrierungsmodus ist ein Schalter auf Sitzungsebene: Wenn er eingeschaltet ist, setzt das Modell maximale Gründlichkeit hinter jede substanzielle Anfrage, erkundet die Aufgabe selbst und verteilt die Arbeit dann standardmäßig per „fan-out" (Auffächerung) an parallele Subagenten. Wenn er ausgeschaltet ist, kehrt dasselbe Orchestrierungs-Tool zum Opt-in pro Anfrage zurück.
Der Modus ist kein API-Parameter. Er ist vollständig aus dokumentierten Bausteinen zusammengesetzt:
xhigh. Es gibt keine versteckte Stufe über den auf dieser Seite aufgeführten.system-Feld auf oberster Ebene ändert sich nie, sodass das gecachte Präfix intakt bleibt.Dieses Beispiel verwendet Systemnachrichten mitten in der Konversation, die derzeit nur auf Claude Opus 4.8 verfügbar sind. Das Fan-out selbst vervielfacht den Token-Verbrauch: Eine einzelne Anfrage kann viele Subagenten-Konversationen auslösen, reserviere den Modus also für Arbeit, die die Kosten rechtfertigt.
Das Beispiel ist eine einzelne Datei. Die Konstanten steuern die Effort-Stufe, die Form des Fan-outs und wie oft die Modus-Auffrischung erneut gesendet wird. MAX_CONCURRENT begrenzt, wie viele Subagenten gleichzeitig laufen (die PHP-Portierung ist sequenziell und ignoriert diesen Wert); MAX_TOTAL_SUBTASKS begrenzt, wie viele das Modell in einem einzelnen Workflow-Aufruf in die Warteschlange stellen darf. Die Trennung der beiden ermöglicht es dem Modell, einen großen Backlog zu planen, ohne alles auf einmal zu starten. Die DOC_TEST_MODE-Prüfung begrenzt die Schleifen auf einen einzelnen Turn, wenn diese Umgebungsvariable gesetzt ist, damit das automatisierte Docs-Harness validieren kann, dass die Datei kompiliert und schnell abschließt, ohne die vollständige Orchestrierung auszuführen; lass sie ungesetzt, wenn du das Beispiel selbst ausführst.
Die Erinnerungen sind absichtlich kurz. Sie schalten den Modus um und verweisen auf die Tool-Beschreibung, in der die umfangreichen Anweisungen stehen. Der vollständige Text wird einmal gesendet, wenn der Modus eingeschaltet wird, die Auffrischung wird erst nach mehreren User-Turns erneut gesendet, und die Ausstiegsnachricht wird einmal gesendet, wenn der Modus ausgeschaltet wird.
Das Workflow-Tool trägt den eigentlichen Verhaltensvertrag: die Opt-in-Regel, die dauerhafte Zustimmung, die gilt, solange der Modus aktiv ist, Granularitätshinweise zur Dimensionierung des Fan-outs und die Qualitätsmuster, auf die das Modell zurückgreifen kann (eine Verifikationswelle, ein Vollständigkeitskritiker, mehrphasige Sequenzierung). Subagenten erhalten außerdem ein report_findings-Tool, damit ihre Ergebnisse als strukturiertes JSON statt als Prosa zurückkommen, und das Bash-Tool ist das von Anthropic definierte bash_20250124-Tool, das lokal ausgeführt wird.
Der Bash-Handler führt den angeforderten Befehl mit einem Timeout aus, erfasst kombiniertes stdout und stderr und kürzt das Ergebnis, damit ein außer Kontrolle geratener Befehl das Kontextfenster nicht überfluten kann. Befehle laufen in dem Verzeichnis, aus dem du das Beispiel startest, also bedeutet das Ausrichten auf ein Projekt, es dort zu starten; wenn DOC_TEST_MODE gesetzt ist, gibt das Harness Bash stattdessen ein kleines Wegwerf-Fixture-Verzeichnis, das beim Beenden entfernt wird. Es gibt hier keine Sandbox: Der Befehl läuft mit den Berechtigungen des Prozesses, der das Beispiel gestartet hat. Der Klarheit halber führt dieses Beispiel jeden Aufruf in einer frischen Subshell aus, anstatt die persistente Sitzung aufrechtzuerhalten, die der bash_20250124-Vertrag beschreibt; ein Produktions-Agent sollte das Tool mit einer langlebigen Shell hinterlegen, damit Arbeitsverzeichnis, Umgebung und die restart-Aktion sich wie dokumentiert verhalten.
Jede Workflow-Teilaufgabe wird zu einer eigenen kleinen Agentenschleife mit dem Bash-Tool, die mit demselben Effort wie die Hauptschleife läuft. Ein Timeout pro Anfrage begrenzt jeden API-Aufruf, sodass eine abgebrochene Verbindung einen Subagenten beeinträchtigt, anstatt den gesamten Lauf zu blockieren.
Ein Fan-out, das Dutzende von Subagenten startet, ist teuer, wenn man es von Grund auf neu starten muss. Ein kleines inhaltsadressiertes Journal macht es idempotent: Bevor ein Subagent gestartet wird, schlage den SHA-256 seines Prompts in einer lokalen JSON-Datei nach und gib das aufgezeichnete Ergebnis zurück, falls eines existiert. Unterbrich den Lauf, führe ihn erneut aus, und nur die Teilaufgaben, die nie abgeschlossen wurden, werden neu berechnet. Das Journal dedupliziert über Läufe hinweg, nicht innerhalb einer einzelnen Fan-out-Welle; lösche die Journal-Datei, um neu zu beginnen.
Das Fan-out akzeptiert bis zu MAX_TOTAL_SUBTASKS Prompts, lässt sie durch das Journal laufen mit höchstens MAX_CONCURRENT gleichzeitig in Bearbeitung (sequenziell in der PHP-Portierung) und isoliert Fehler, sodass ein defekter Subagent zu einem Fehler-String degradiert, anstatt den Lauf zu beenden. Sobald die erste Welle abgeschlossen ist, verwendet eine zweite Welle denselben Subagenten-Pfad, um zu versuchen, jedes Ergebnis zu widerlegen: Jeder Verifizierer leitet die Behauptungen erneut aus der Quelle ab und fällt bei Unsicherheit standardmäßig auf „widerlegt" zurück. Sowohl das ursprüngliche Ergebnis als auch sein Urteil werden an den Orchestrator zurückgegeben, damit er sie zusammen abwägen kann.
Der Agent hängt zuerst die Nachricht des Users an, dann alle fälligen Systemnachrichten: die Ausstiegsnachricht, den vollständigen Modus-Text beim Eintritt oder die periodische Auffrischung. Das Platzieren der Systemnachricht nach dem User-Turn lässt jedes gecachte Byte davor unberührt und erfüllt die Platzierungsregel, dass eine Systemnachricht auf einen User-Turn folgt.
Das Bash-Tool in diesem Beispiel führt vom Modell geschriebene Befehle direkt auf deinem Rechner ohne Sandbox aus, und das Fan-out lässt mehrere dieser Agenten parallel laufen. Führe es in einem Verzeichnis und einer Umgebung aus, die du bedenkenlos freigeben kannst, und füge Sandboxing hinzu, bevor du es für etwas anderes als lokales Experimentieren anpasst.
Starte das Beispiel aus dem Verzeichnis, in dem die Agenten arbeiten sollen, zum Beispiel dem Root eines Repositorys, das überprüft werden soll:
python orchestration_mode.py "Review this repository for flaky tests and propose fixes."Mit eingeschaltetem Modus kannst du erwarten, dass das Modell mit ein paar Bash-Befehlen erkundet, das Workflow-Tool unaufgefordert aufruft und die Subagenten-Berichte zu einer finalen Antwort zusammenfasst. Triviale oder konversationelle Anfragen bleiben solo, wie die Erinnerung anweist.
Dieses Beispiel ist absichtlich klein gehalten. Ein Harness, das für echte Workloads gedacht ist, würde typischerweise Folgendes hinzufügen:
Die Muster in diesem Beispiel (die Modus-Erinnerungen, dauerhafte Zustimmung in der Tool-Beschreibung, Journaling und eine Verifikationswelle) lassen sich unverändert übertragen; nur das Ausführungssubstrat um sie herum wird robuster.
import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-opus-4-8"
EFFORT = "xhigh"
SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."
REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"MODE_ENTER = (
"Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
"the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
"natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
"description for standing consent, granularity guidance, and quality patterns. Work solo "
"only on conversational or trivial turns."
)
MODE_REFRESH = (
"Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
"Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)WORKFLOW_TOOL = {
"name": "Workflow",
"description": (
"Orchestrate a multi-agent workflow: split a large task into independent subtasks "
"and run them as parallel agents, then collect their results.\n\n"
"Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
"system message confirms that orchestration mode is on.\n\n"
"Quality patterns: adversarial verification (a second wave of agents checks the first "
"wave's findings against the source), a completeness critic (one agent hunts for what "
"the others missed), and multi-phase sequencing (understand, design, implement, and "
"review as separate workflow calls, reading results between phases). A useful default "
"is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
"Granularity: scope each subtask to a distinct concern, component, or question rather "
"than per line or per file section. Scale the count to what the user asked for: a "
"focused review of a module of a few hundred lines rarely needs more than about ten "
"subtasks; a broad audit of a large codebase can justify more.\n\n"
"Standing consent: while a system message confirms orchestration mode is on, that "
"opt-in is standing. Author and run a workflow for every substantive task by default, "
"and lean toward verifying findings adversarially. Work solo only on conversational "
"turns or trivial mechanical edits. When a system message says the mode is off, "
"revert to the opt-in rule above."
),
"input_schema": {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {"type": "string"},
"description": "Independent subtask prompts to run as parallel agents",
}
},
"required": ["subtasks"],
},
}
BASH_TOOL = {"type": "bash_20250124", "name": "bash"}
REPORT_TOOL = {
"name": "report_findings",
"description": (
"Report the final findings for your subtask. Call this exactly once, when you are "
"done investigating; it ends your task."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "Two or three sentences of synthesis"},
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"claim": {"type": "string", "description": "The finding, one sentence"},
"evidence": {
"type": "string",
"description": "How it was verified (file, line, or command output)",
},
"severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
},
"required": ["claim", "evidence", "severity"],
},
},
},
"required": ["summary", "findings"],
},
}# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
fixture.write(
"def fib(n):\n"
" return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
"print(fib(10))\n"
)
else:
WORK_DIR = os.getcwd()
def run_bash(command: str) -> tuple[str, bool]:
"""Run a shell command and return (output, is_error). No sandbox: example code only."""
print(f"[bash] {command}", file=sys.stderr)
try:
proc = subprocess.run(
["bash", "-c", command],
cwd=WORK_DIR,
capture_output=True,
text=True,
errors="replace",
timeout=BASH_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
output = (proc.stdout + proc.stderr).strip() or "(no output)"
if len(output) > TOOL_RESULT_MAX_CHARS:
output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
if proc.returncode != 0:
output = f"(exit code {proc.returncode})\n{output}"
return output, proc.returncode != 0
def handle_bash_block(block) -> tuple[str, bool]:
if block.input.get("restart") is True:
return "Shell restarted.", False
command = block.input.get("command")
if not isinstance(command, str) or not command:
return "bash error: no command was provided.", True
return run_bash(command)def run_subagent(model: str, prompt: str) -> str:
"""One subagent: a small nested agent loop with the bash tool plus report_findings.
Subagents inherit the main loop's effort level."""
subagent_system = (
"You are one agent in a larger parallel fan-out, assigned a single subtask. "
"Investigate it directly, using bash to check facts rather than guessing, and finish "
"by calling report_findings exactly once. Return findings, not narration."
)
messages = [{"role": "user", "content": prompt}]
for _ in range(MAX_SUBAGENT_TURNS):
with client.messages.stream(
model=model,
max_tokens=64000,
system=subagent_system,
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[BASH_TOOL, REPORT_TOOL],
messages=messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
text += "\n\n(warning: subagent response was truncated at max_tokens)"
return text
tool_results = []
report = None
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "report_findings":
report = json.dumps(block.input, indent=2)
output, is_error = "Findings recorded.", False
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
if report is not None:
return report
messages.append({"role": "user", "content": tool_results})
return "(subagent hit the turn limit before finishing)"_journal_lock = threading.Lock()
def _load_journal() -> dict:
try:
with open(JOURNAL_PATH) as file:
return json.load(file) or {}
except (OSError, json.JSONDecodeError):
return {}
def journaled(prompt: str, compute) -> str:
"""Return a cached result for this exact prompt, or compute and persist it. This
makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
that never finished are recomputed. Delete the journal file to start fresh."""
key = hashlib.sha256(prompt.encode()).hexdigest()
cached = _load_journal().get(key)
if cached is not None:
print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
return cached
result = compute()
try:
with _journal_lock: # fan-out writes from many threads
journal = _load_journal()
journal[key] = result
temp = f"{JOURNAL_PATH}.tmp"
with open(temp, "w") as file:
json.dump(journal, file)
os.replace(temp, JOURNAL_PATH) # atomic on POSIX and Windows
except OSError as error: # the journal is best-effort; never discard a computed result
print(f"[journal] write failed: {error}", file=sys.stderr)
return resultdef normalize_subtasks(raw) -> list[str]:
"""Accept the subtasks input in whatever shape the model emits: an array, the array
JSON-encoded as a single string, or a newline-separated list."""
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
raw = raw.splitlines() if "\n" in raw else [raw]
if not isinstance(raw, list):
return []
return [task.strip() for task in raw if isinstance(task, str) and task.strip()]
def verify_prompt_for(subtask: str, result: str) -> str:
return (
"Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
"claims yourself with bash rather than trusting the result, and look for evidence "
"that contradicts them. Default to refuted if uncertain. Call report_findings with "
"summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
"output that decided it.\n\n"
f"Subtask: {subtask}\n\nResult to verify:\n{result}"
)
def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
"""Run subtasks as parallel subagents, then run a second verification wave over
the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
queue; MAX_CONCURRENT bounds how many run at once."""
all_subtasks = normalize_subtasks(raw_subtasks)
subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
dropped = len(all_subtasks) - len(subtasks)
if not subtasks:
return "Workflow error: no usable subtasks were provided.", True
print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)
def run_one(prompt: str) -> str:
try:
return journaled(prompt, lambda: run_subagent(model, prompt))
except Exception as error: # isolation boundary: one bad subagent should not end the run
return f"(subagent failed: {type(error).__name__}: {error})"
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
results = list(pool.map(run_one, subtasks))
print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
verdicts = list(pool.map(run_one, verify_prompts))
joined = "\n\n".join(
f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
)
if dropped > 0:
joined = (
f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
"run; rerun them in a follow-up Workflow call)\n\n" + joined
)
return joined, Falseclass ModeAgent:
"""An agent loop whose orchestration mode is toggled with mid-conversation system messages."""
def __init__(self, model: str, mode_on: bool = True):
self.model = model
self.mode_on = mode_on
self.messages: list[dict] = []
self._mode_announced = False
self._exit_pending = False
self._turns_since_reminder = 0
def set_mode(self, mode_on: bool) -> None:
"""Turn the mode on or off. The notice is delivered with the next user turn."""
if mode_on == self.mode_on:
return
if not mode_on:
if self._mode_announced:
self._exit_pending = True
else:
self._exit_pending = False
self.mode_on = mode_on
def _due_system_messages(self) -> list[dict]:
"""System messages owed on this turn: an exit notice, the full mode text on entry,
or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
due = []
if self._exit_pending:
self._exit_pending = False
self._mode_announced = False
due.append({"role": "system", "content": MODE_EXIT})
if self.mode_on:
if not self._mode_announced:
self._mode_announced = True
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_ENTER})
elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_REFRESH})
return due
def turn(self, user_input: str) -> str:
# Mid-conversation system messages follow the user turn they apply to, which keeps
# the cached prefix ahead of them untouched.
self.messages.append({"role": "user", "content": user_input})
self.messages.extend(self._due_system_messages())
self._turns_since_reminder += 1
for _ in range(MAX_MAIN_TURNS):
with client.messages.stream(
model=self.model,
max_tokens=64000,
system=SYSTEM_PROMPT, # static for the whole session
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[WORKFLOW_TOOL, BASH_TOOL],
messages=self.messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
self.messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
# Drop the truncated assistant message so later turns don't build on it.
self.messages.pop()
text += "\n\n(warning: response was truncated at max_tokens)"
return text
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "Workflow":
output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
self.messages.append({"role": "user", "content": tool_results})
return "(hit the main loop turn limit before finishing)"if __name__ == "__main__":
task = (
sys.argv[1]
if len(sys.argv) > 1
else "Explore the current directory, then give a thorough review: what it does, "
"code-quality issues, and concrete improvements."
)
agent = ModeAgent(MODEL)
print(agent.turn(task))
agent.set_mode(False)
print(agent.turn("Briefly summarize what you found above, no fan-out needed."))Tools definieren, Tool-Aufrufe handhaben und Tool-Ergebnisse.