Um modo de orquestração é um interruptor em nível de sessão: quando está ativado, o modelo aplica o máximo de rigor a cada solicitação substantiva, explorando a tarefa por conta própria e então distribuindo o trabalho para subagentes paralelos por padrão. Quando está desativado, a mesma ferramenta de orquestração volta a exigir adesão explícita por solicitação.
O modo não é um parâmetro da API. Ele é construído inteiramente a partir de peças documentadas:
xhigh. Não há nenhum nível oculto acima dos que estão naquela página.system de nível superior nunca muda, então o prefixo em cache permanece intacto.Este exemplo usa mensagens de sistema no meio da conversa, que atualmente estão disponíveis apenas no Claude Opus 4.8. A distribuição de trabalho em si multiplica o uso de tokens: uma única solicitação pode gerar muitas conversas de subagentes, então reserve o modo para trabalhos que justifiquem o custo.
O exemplo é um único arquivo. As constantes controlam o nível de esforço, o formato da distribuição de trabalho e com que frequência o lembrete de modo é reenviado. MAX_CONCURRENT limita quantos subagentes são executados ao mesmo tempo (a versão em PHP é sequencial e ignora isso); MAX_TOTAL_SUBTASKS limita quantos o modelo pode enfileirar em uma única chamada de Workflow. Separar os dois permite que o modelo planeje um grande backlog sem iniciá-lo todo de uma vez. A verificação DOC_TEST_MODE limita os loops a um único turno quando essa variável de ambiente está definida, para que o harness automatizado de documentação possa validar que o arquivo compila e termina rapidamente sem executar a orquestração completa; deixe-a indefinida ao executar o exemplo você mesmo.
import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-opus-4-8"
EFFORT = "xhigh"
SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."
REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"Os lembretes são curtos de propósito. Eles alternam o modo e apontam para a descrição da ferramenta, onde estão as instruções mais pesadas. O texto completo é enviado uma vez quando o modo é ativado, o lembrete é reenviado apenas após vários turnos do usuário, e o aviso de saída é enviado uma vez quando o modo é desativado.
MODE_ENTER = (
"Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
"the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
"natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
"description for standing consent, granularity guidance, and quality patterns. Work solo "
"only on conversational or trivial turns."
)
MODE_REFRESH = (
"Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
"Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)A ferramenta Workflow carrega o verdadeiro contrato comportamental: a regra de adesão explícita, o consentimento permanente que se aplica enquanto o modo está ativado, orientação de granularidade para dimensionar a distribuição de trabalho, e os padrões de qualidade que o modelo pode usar (uma onda de verificação, um crítico de completude, sequenciamento multifásico). Os subagentes também recebem uma ferramenta report_findings para que seus resultados retornem como JSON estruturado em vez de prosa, e a ferramenta bash é a ferramenta bash_20250124 definida pela Anthropic, executada localmente.
WORKFLOW_TOOL = {
"name": "Workflow",
"description": (
"Orchestrate a multi-agent workflow: split a large task into independent subtasks "
"and run them as parallel agents, then collect their results.\n\n"
"Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
"system message confirms that orchestration mode is on.\n\n"
"Quality patterns: adversarial verification (a second wave of agents checks the first "
"wave's findings against the source), a completeness critic (one agent hunts for what "
"the others missed), and multi-phase sequencing (understand, design, implement, and "
"review as separate workflow calls, reading results between phases). A useful default "
"is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
"Granularity: scope each subtask to a distinct concern, component, or question rather "
"than per line or per file section. Scale the count to what the user asked for: a "
"focused review of a module of a few hundred lines rarely needs more than about ten "
"subtasks; a broad audit of a large codebase can justify more.\n\n"
"Standing consent: while a system message confirms orchestration mode is on, that "
"opt-in is standing. Author and run a workflow for every substantive task by default, "
"and lean toward verifying findings adversarially. Work solo only on conversational "
"turns or trivial mechanical edits. When a system message says the mode is off, "
"revert to the opt-in rule above."
),
"input_schema": {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {"type": "string"},
"description": "Independent subtask prompts to run as parallel agents",
}
},
"required": ["subtasks"],
},
}
BASH_TOOL = {"type": "bash_20250124", "name": "bash"}
REPORT_TOOL = {
"name": "report_findings",
"description": (
"Report the final findings for your subtask. Call this exactly once, when you are "
"done investigating; it ends your task."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "Two or three sentences of synthesis"},
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"claim": {"type": "string", "description": "The finding, one sentence"},
"evidence": {
"type": "string",
"description": "How it was verified (file, line, or command output)",
},
"severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
},
"required": ["claim", "evidence", "severity"],
},
},
},
"required": ["summary", "findings"],
},
}O handler do bash executa o comando solicitado com um timeout, captura stdout e stderr combinados, e trunca o resultado para que um comando descontrolado não inunde a janela de contexto. Os comandos são executados no diretório de onde você inicia o exemplo, então apontá-lo para um projeto significa iniciá-lo lá; quando DOC_TEST_MODE está definido, o harness em vez disso fornece ao bash um pequeno diretório de fixture descartável que é removido na saída. Não há sandbox aqui: o comando é executado com as permissões do processo que iniciou o exemplo. Para maior clareza, este exemplo executa cada chamada em um subshell novo em vez de manter a sessão persistente que o contrato bash_20250124 descreve; um agente de produção deve apoiar a ferramenta com um shell de longa duração para que o diretório de trabalho, o ambiente e a ação restart se comportem conforme documentado.
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
fixture.write(
"def fib(n):\n"
" return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
"print(fib(10))\n"
)
else:
WORK_DIR = os.getcwd()
def run_bash(command: str) -> tuple[str, bool]:
"""Run a shell command and return (output, is_error). No sandbox: example code only."""
print(f"[bash] {command}", file=sys.stderr)
try:
proc = subprocess.run(
["bash", "-c", command],
cwd=WORK_DIR,
capture_output=True,
text=True,
errors="replace",
timeout=BASH_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
output = (proc.stdout + proc.stderr).strip() or "(no output)"
if len(output) > TOOL_RESULT_MAX_CHARS:
output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
if proc.returncode != 0:
output = f"(exit code {proc.returncode})\n{output}"
return output, proc.returncode != 0
def handle_bash_block(block) -> tuple[str, bool]:
if block.input.get("restart") is True:
return "Shell restarted.", False
command = block.input.get("command")
if not isinstance(command, str) or not command:
return "bash error: no command was provided.", True
return run_bash(command)Cada subtarefa do fluxo de trabalho se torna seu próprio pequeno loop de agente com a ferramenta bash, executando no mesmo nível de esforço que o loop principal. Um timeout por solicitação limita cada chamada de API para que uma conexão perdida degrade um subagente em vez de travar toda a execução.
def run_subagent(model: str, prompt: str) -> str:
"""One subagent: a small nested agent loop with the bash tool plus report_findings.
Subagents inherit the main loop's effort level."""
subagent_system = (
"You are one agent in a larger parallel fan-out, assigned a single subtask. "
"Investigate it directly, using bash to check facts rather than guessing, and finish "
"by calling report_findings exactly once. Return findings, not narration."
)
messages = [{"role": "user", "content": prompt}]
for _ in range(MAX_SUBAGENT_TURNS):
with client.messages.stream(
model=model,
max_tokens=64000,
system=subagent_system,
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[BASH_TOOL, REPORT_TOOL],
messages=messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
text += "\n\n(warning: subagent response was truncated at max_tokens)"
return text
tool_results = []
report = None
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "report_findings":
report = json.dumps(block.input, indent=2)
output, is_error = "Findings recorded.", False
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
if report is not None:
return report
messages.append({"role": "user", "content": tool_results})
return "(subagent hit the turn limit before finishing)"Uma distribuição de trabalho que gera dezenas de subagentes é cara para reiniciar do zero. Um pequeno journal endereçado por conteúdo torna isso idempotente: antes de despachar um subagente, procure o SHA-256 do seu prompt em um arquivo JSON local e retorne o resultado registrado se existir um. Interrompa a execução, execute-a novamente, e apenas as subtarefas que nunca terminaram são recomputadas. O journal deduplica entre execuções, não dentro de uma única onda de distribuição; exclua o arquivo de journal para começar do zero.
_journal_lock = threading.Lock()
def _load_journal() -> dict:
try:
with open(JOURNAL_PATH) as file:
return json.load(file) or {}
except (OSError, json.JSONDecodeError):
return {}
def journaled(prompt: str, compute) -> str:
"""Return a cached result for this exact prompt, or compute and persist it. This
makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
that never finished are recomputed. Delete the journal file to start fresh."""
key = hashlib.sha256(prompt.encode()).hexdigest()
cached = _load_journal().get(key)
if cached is not None:
print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
return cached
result = compute()
try:
with _journal_lock: # fan-out writes from many threads
journal = _load_journal()
journal[key] = result
temp = f"{JOURNAL_PATH}.tmp"
with open(temp, "w") as file:
json.dump(journal, file)
os.replace(temp, JOURNAL_PATH) # atomic on POSIX and Windows
except OSError as error: # the journal is best-effort; never discard a computed result
print(f"[journal] write failed: {error}", file=sys.stderr)
return resultA distribuição de trabalho aceita até MAX_TOTAL_SUBTASKS prompts, executa-os através do journal com no máximo MAX_CONCURRENT em andamento (sequencial na versão em PHP), e isola falhas para que um subagente quebrado degrade para uma string de erro em vez de encerrar a execução. Assim que a primeira onda termina, uma segunda onda reutiliza o mesmo caminho de subagente para tentar refutar cada resultado: cada verificador rederiva as afirmações a partir da fonte, assumindo refutado por padrão quando incerto. Tanto o resultado original quanto seu veredito são retornados ao orquestrador para que ele possa ponderá-los juntos.
def normalize_subtasks(raw) -> list[str]:
"""Accept the subtasks input in whatever shape the model emits: an array, the array
JSON-encoded as a single string, or a newline-separated list."""
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
raw = raw.splitlines() if "\n" in raw else [raw]
if not isinstance(raw, list):
return []
return [task.strip() for task in raw if isinstance(task, str) and task.strip()]
def verify_prompt_for(subtask: str, result: str) -> str:
return (
"Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
"claims yourself with bash rather than trusting the result, and look for evidence "
"that contradicts them. Default to refuted if uncertain. Call report_findings with "
"summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
"output that decided it.\n\n"
f"Subtask: {subtask}\n\nResult to verify:\n{result}"
)
def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
"""Run subtasks as parallel subagents, then run a second verification wave over
the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
queue; MAX_CONCURRENT bounds how many run at once."""
all_subtasks = normalize_subtasks(raw_subtasks)
subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
dropped = len(all_subtasks) - len(subtasks)
if not subtasks:
return "Workflow error: no usable subtasks were provided.", True
print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)
def run_one(prompt: str) -> str:
try:
return journaled(prompt, lambda: run_subagent(model, prompt))
except Exception as error: # isolation boundary: one bad subagent should not end the run
return f"(subagent failed: {type(error).__name__}: {error})"
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
results = list(pool.map(run_one, subtasks))
print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
verdicts = list(pool.map(run_one, verify_prompts))
joined = "\n\n".join(
f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
)
if dropped > 0:
joined = (
f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
"run; rerun them in a follow-up Workflow call)\n\n" + joined
)
return joined, FalseO agente anexa primeiro a mensagem do usuário, depois quaisquer mensagens de sistema que estejam pendentes: o aviso de saída, o texto completo do modo na entrada, ou o lembrete periódico. Colocar a mensagem de sistema após o turno do usuário mantém intacto cada byte em cache à frente dela, e satisfaz a regra de posicionamento de que uma mensagem de sistema segue um turno do usuário.
class ModeAgent:
"""An agent loop whose orchestration mode is toggled with mid-conversation system messages."""
def __init__(self, model: str, mode_on: bool = True):
self.model = model
self.mode_on = mode_on
self.messages: list[dict] = []
self._mode_announced = False
self._exit_pending = False
self._turns_since_reminder = 0
def set_mode(self, mode_on: bool) -> None:
"""Turn the mode on or off. The notice is delivered with the next user turn."""
if mode_on == self.mode_on:
return
if not mode_on:
if self._mode_announced:
self._exit_pending = True
else:
self._exit_pending = False
self.mode_on = mode_on
def _due_system_messages(self) -> list[dict]:
"""System messages owed on this turn: an exit notice, the full mode text on entry,
or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
due = []
if self._exit_pending:
self._exit_pending = False
self._mode_announced = False
due.append({"role": "system", "content": MODE_EXIT})
if self.mode_on:
if not self._mode_announced:
self._mode_announced = True
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_ENTER})
elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_REFRESH})
return due
def turn(self, user_input: str) -> str:
# Mid-conversation system messages follow the user turn they apply to, which keeps
# the cached prefix ahead of them untouched.
self.messages.append({"role": "user", "content": user_input})
self.messages.extend(self._due_system_messages())
self._turns_since_reminder += 1
for _ in range(MAX_MAIN_TURNS):
with client.messages.stream(
model=self.model,
max_tokens=64000,
system=SYSTEM_PROMPT, # static for the whole session
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[WORKFLOW_TOOL, BASH_TOOL],
messages=self.messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
self.messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
# Drop the truncated assistant message so later turns don't build on it.
self.messages.pop()
text += "\n\n(warning: response was truncated at max_tokens)"
return text
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "Workflow":
output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
self.messages.append({"role": "user", "content": tool_results})
return "(hit the main loop turn limit before finishing)"A ferramenta bash neste exemplo executa comandos escritos pelo modelo diretamente na sua máquina sem sandbox, e a distribuição de trabalho executa vários desses agentes em paralelo. Execute-o em um diretório e ambiente que você esteja confortável em expor, e adicione sandboxing antes de adaptá-lo para qualquer coisa além de experimentação local.
if __name__ == "__main__":
task = (
sys.argv[1]
if len(sys.argv) > 1
else "Explore the current directory, then give a thorough review: what it does, "
"code-quality issues, and concrete improvements."
)
agent = ModeAgent(MODEL)
print(agent.turn(task))
agent.set_mode(False)
print(agent.turn("Briefly summarize what you found above, no fan-out needed."))Inicie o exemplo a partir do diretório em que você quer que os agentes trabalhem, por exemplo a raiz de um repositório a ser revisado:
python orchestration_mode.py "Review this repository for flaky tests and propose fixes."Com o modo ativado, espere que o modelo explore com alguns comandos bash, despache a ferramenta Workflow sem ser solicitado, e sintetize os relatórios dos subagentes em uma resposta final. Solicitações triviais ou conversacionais permanecem solo, conforme o lembrete instrui.
Este exemplo é deliberadamente pequeno. Um harness destinado a cargas de trabalho reais normalmente adicionaria:
Os padrões neste exemplo (os lembretes de modo, consentimento permanente na descrição da ferramenta, journaling e uma onda de verificação) são transferidos sem alterações; apenas o substrato de execução ao redor deles fica mais robusto.
O mecanismo que os lembretes de modo usam, e como ele interage com o cache de prompt.
Os níveis de esforço que a API aceita e como escolher um.
Definir ferramentas, lidar com chamadas de ferramentas e resultados de ferramentas.
A ferramenta bash definida pela Anthropic que este exemplo executa localmente.
Was this page helpful?