Was this page helpful?
Un modo de orquestación es un interruptor a nivel de sesión: cuando está activado, el modelo aplica la máxima exhaustividad a cada solicitud sustantiva, explorando primero la tarea por sí mismo y luego distribuyendo el trabajo a subagentes paralelos de forma predeterminada. Cuando está desactivado, la misma herramienta de orquestación vuelve a requerir activación explícita por solicitud.
El modo no es un parámetro de la API. Se construye completamente a partir de piezas documentadas:
xhigh. No existe ningún nivel oculto por encima de los que aparecen en esa página.system de nivel superior nunca cambia, por lo que el prefijo almacenado en caché permanece intacto.Este ejemplo usa mensajes del sistema a mitad de conversación, que actualmente solo están disponibles en Claude Opus 4.8. La distribución en paralelo en sí multiplica el uso de tokens: una sola solicitud puede generar muchas conversaciones de subagentes, así que reserva el modo para trabajo que justifique el costo.
El ejemplo es un solo archivo. Las constantes controlan el nivel de esfuerzo, la forma de la distribución en paralelo y la frecuencia con la que se reenvía el recordatorio de modo. MAX_CONCURRENT limita cuántos subagentes se ejecutan al mismo tiempo (la versión de PHP es secuencial y lo ignora); MAX_TOTAL_SUBTASKS limita cuántos puede poner en cola el modelo en una sola llamada a Workflow. Separar ambos permite que el modelo planifique una lista de tareas grande sin lanzarla toda de una vez. La comprobación de DOC_TEST_MODE limita los bucles a un solo turno cuando esa variable de entorno está definida, de modo que el sistema automatizado de pruebas de documentación pueda validar que el archivo compila y termina rápidamente sin ejecutar la orquestación completa; déjala sin definir cuando ejecutes el ejemplo tú mismo.
Los recordatorios son cortos a propósito. Activan o desactivan el modo y apuntan a la descripción de la herramienta, donde residen las instrucciones detalladas. El texto completo se envía una vez cuando el modo se activa, el recordatorio se reenvía solo después de varios turnos del usuario, y el aviso de salida se envía una vez cuando el modo se desactiva.
La herramienta Workflow contiene el verdadero contrato de comportamiento: la regla de activación explícita, el consentimiento permanente que aplica mientras el modo está activado, orientación sobre granularidad para dimensionar la distribución en paralelo, y los patrones de calidad a los que el modelo puede recurrir (una ronda de verificación, un crítico de completitud, secuenciación multifase). Los subagentes también reciben una herramienta report_findings para que sus resultados regresen como JSON estructurado en lugar de prosa, y la herramienta bash es la herramienta bash_20250124 definida por Anthropic ejecutada localmente.
El manejador de bash ejecuta el comando solicitado con un tiempo límite, captura stdout y stderr combinados, y trunca el resultado para que un comando descontrolado no inunde la ventana de contexto. Los comandos se ejecutan en el directorio desde el que inicias el ejemplo, así que apuntarlo a un proyecto significa iniciarlo allí; cuando DOC_TEST_MODE está definida, el sistema de pruebas en su lugar le da a bash un pequeño directorio de prueba desechable que se elimina al salir. Aquí no hay sandbox: el comando se ejecuta con los permisos del proceso que inició el ejemplo. Para mayor claridad, este ejemplo ejecuta cada llamada en un subshell nuevo en lugar de mantener la sesión persistente que describe el contrato de bash_20250124; un agente de producción debería respaldar la herramienta con un shell de larga duración para que el directorio de trabajo, el entorno y la acción restart se comporten como está documentado.
Cada subtarea del flujo de trabajo se convierte en su propio pequeño bucle de agente con la herramienta bash, ejecutándose con el mismo esfuerzo que el bucle principal. Un tiempo límite por solicitud acota cada llamada a la API para que una conexión perdida degrade un subagente en lugar de bloquear toda la ejecución.
Una distribución en paralelo que genera docenas de subagentes es costosa de reiniciar desde cero. Un pequeño registro direccionado por contenido la hace idempotente: antes de despachar un subagente, busca el SHA-256 de su prompt en un archivo JSON local y devuelve el resultado registrado si existe uno. Interrumpe la ejecución, vuelve a ejecutarla, y solo las subtareas que nunca terminaron se recalculan. El registro deduplica entre ejecuciones, no dentro de una sola ronda de distribución; elimina el archivo de registro para empezar de cero.
La distribución en paralelo acepta hasta MAX_TOTAL_SUBTASKS prompts, los ejecuta a través del registro con un máximo de MAX_CONCURRENT en curso (secuencial en la versión de PHP), y aísla los fallos para que un subagente roto se degrade a una cadena de error en lugar de terminar la ejecución. Una vez que termina la primera ronda, una segunda ronda reutiliza la misma ruta de subagente para intentar refutar cada resultado: cada verificador vuelve a derivar las afirmaciones desde la fuente, marcándolas como refutadas por defecto cuando hay incertidumbre. Tanto el resultado original como su veredicto se devuelven al orquestador para que pueda ponderarlos juntos.
El agente agrega primero el mensaje del usuario, luego cualquier mensaje del sistema que corresponda: el aviso de salida, el texto completo del modo al entrar, o el recordatorio periódico. Colocar el mensaje del sistema después del turno del usuario mantiene intacto cada byte almacenado en caché que lo precede, y satisface la regla de ubicación de que un mensaje del sistema sigue a un turno del usuario.
La herramienta bash en este ejemplo ejecuta comandos escritos por el modelo directamente en tu máquina sin ningún sandbox, y la distribución en paralelo ejecuta varios de esos agentes simultáneamente. Ejecútalo en un directorio y entorno que te sientas cómodo exponiendo, y agrega aislamiento (sandboxing) antes de adaptarlo para cualquier cosa más allá de la experimentación local.
Inicia el ejemplo desde el directorio en el que quieres que trabajen los agentes, por ejemplo la raíz de un repositorio a revisar:
python orchestration_mode.py "Review this repository for flaky tests and propose fixes."Con el modo activado, espera que el modelo explore con algunos comandos bash, despache la herramienta Workflow sin que se lo pidan, y sintetice los informes de los subagentes en una respuesta final. Las solicitudes triviales o conversacionales permanecen en solitario, como indica el recordatorio.
Este ejemplo es deliberadamente pequeño. Un sistema pensado para cargas de trabajo reales típicamente agregaría:
Los patrones de este ejemplo (los recordatorios de modo, el consentimiento permanente en la descripción de la herramienta, el registro y una ronda de verificación) se trasladan sin cambios; solo el sustrato de ejecución a su alrededor se vuelve más robusto.
import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-opus-4-8"
EFFORT = "xhigh"
SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."
REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"MODE_ENTER = (
"Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
"the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
"natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
"description for standing consent, granularity guidance, and quality patterns. Work solo "
"only on conversational or trivial turns."
)
MODE_REFRESH = (
"Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
"Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)WORKFLOW_TOOL = {
"name": "Workflow",
"description": (
"Orchestrate a multi-agent workflow: split a large task into independent subtasks "
"and run them as parallel agents, then collect their results.\n\n"
"Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
"system message confirms that orchestration mode is on.\n\n"
"Quality patterns: adversarial verification (a second wave of agents checks the first "
"wave's findings against the source), a completeness critic (one agent hunts for what "
"the others missed), and multi-phase sequencing (understand, design, implement, and "
"review as separate workflow calls, reading results between phases). A useful default "
"is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
"Granularity: scope each subtask to a distinct concern, component, or question rather "
"than per line or per file section. Scale the count to what the user asked for: a "
"focused review of a module of a few hundred lines rarely needs more than about ten "
"subtasks; a broad audit of a large codebase can justify more.\n\n"
"Standing consent: while a system message confirms orchestration mode is on, that "
"opt-in is standing. Author and run a workflow for every substantive task by default, "
"and lean toward verifying findings adversarially. Work solo only on conversational "
"turns or trivial mechanical edits. When a system message says the mode is off, "
"revert to the opt-in rule above."
),
"input_schema": {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {"type": "string"},
"description": "Independent subtask prompts to run as parallel agents",
}
},
"required": ["subtasks"],
},
}
BASH_TOOL = {"type": "bash_20250124", "name": "bash"}
REPORT_TOOL = {
"name": "report_findings",
"description": (
"Report the final findings for your subtask. Call this exactly once, when you are "
"done investigating; it ends your task."
),
"input_schema": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "Two or three sentences of synthesis"},
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"claim": {"type": "string", "description": "The finding, one sentence"},
"evidence": {
"type": "string",
"description": "How it was verified (file, line, or command output)",
},
"severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
},
"required": ["claim", "evidence", "severity"],
},
},
},
"required": ["summary", "findings"],
},
}# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
fixture.write(
"def fib(n):\n"
" return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
"print(fib(10))\n"
)
else:
WORK_DIR = os.getcwd()
def run_bash(command: str) -> tuple[str, bool]:
"""Run a shell command and return (output, is_error). No sandbox: example code only."""
print(f"[bash] {command}", file=sys.stderr)
try:
proc = subprocess.run(
["bash", "-c", command],
cwd=WORK_DIR,
capture_output=True,
text=True,
errors="replace",
timeout=BASH_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
output = (proc.stdout + proc.stderr).strip() or "(no output)"
if len(output) > TOOL_RESULT_MAX_CHARS:
output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
if proc.returncode != 0:
output = f"(exit code {proc.returncode})\n{output}"
return output, proc.returncode != 0
def handle_bash_block(block) -> tuple[str, bool]:
if block.input.get("restart") is True:
return "Shell restarted.", False
command = block.input.get("command")
if not isinstance(command, str) or not command:
return "bash error: no command was provided.", True
return run_bash(command)def run_subagent(model: str, prompt: str) -> str:
"""One subagent: a small nested agent loop with the bash tool plus report_findings.
Subagents inherit the main loop's effort level."""
subagent_system = (
"You are one agent in a larger parallel fan-out, assigned a single subtask. "
"Investigate it directly, using bash to check facts rather than guessing, and finish "
"by calling report_findings exactly once. Return findings, not narration."
)
messages = [{"role": "user", "content": prompt}]
for _ in range(MAX_SUBAGENT_TURNS):
with client.messages.stream(
model=model,
max_tokens=64000,
system=subagent_system,
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[BASH_TOOL, REPORT_TOOL],
messages=messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
text += "\n\n(warning: subagent response was truncated at max_tokens)"
return text
tool_results = []
report = None
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "report_findings":
report = json.dumps(block.input, indent=2)
output, is_error = "Findings recorded.", False
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
if report is not None:
return report
messages.append({"role": "user", "content": tool_results})
return "(subagent hit the turn limit before finishing)"_journal_lock = threading.Lock()
def _load_journal() -> dict:
try:
with open(JOURNAL_PATH) as file:
return json.load(file) or {}
except (OSError, json.JSONDecodeError):
return {}
def journaled(prompt: str, compute) -> str:
"""Return a cached result for this exact prompt, or compute and persist it. This
makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
that never finished are recomputed. Delete the journal file to start fresh."""
key = hashlib.sha256(prompt.encode()).hexdigest()
cached = _load_journal().get(key)
if cached is not None:
print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
return cached
result = compute()
try:
with _journal_lock: # fan-out writes from many threads
journal = _load_journal()
journal[key] = result
temp = f"{JOURNAL_PATH}.tmp"
with open(temp, "w") as file:
json.dump(journal, file)
os.replace(temp, JOURNAL_PATH) # atomic on POSIX and Windows
except OSError as error: # the journal is best-effort; never discard a computed result
print(f"[journal] write failed: {error}", file=sys.stderr)
return resultdef normalize_subtasks(raw) -> list[str]:
"""Accept the subtasks input in whatever shape the model emits: an array, the array
JSON-encoded as a single string, or a newline-separated list."""
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
raw = raw.splitlines() if "\n" in raw else [raw]
if not isinstance(raw, list):
return []
return [task.strip() for task in raw if isinstance(task, str) and task.strip()]
def verify_prompt_for(subtask: str, result: str) -> str:
return (
"Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
"claims yourself with bash rather than trusting the result, and look for evidence "
"that contradicts them. Default to refuted if uncertain. Call report_findings with "
"summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
"output that decided it.\n\n"
f"Subtask: {subtask}\n\nResult to verify:\n{result}"
)
def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
"""Run subtasks as parallel subagents, then run a second verification wave over
the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
queue; MAX_CONCURRENT bounds how many run at once."""
all_subtasks = normalize_subtasks(raw_subtasks)
subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
dropped = len(all_subtasks) - len(subtasks)
if not subtasks:
return "Workflow error: no usable subtasks were provided.", True
print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)
def run_one(prompt: str) -> str:
try:
return journaled(prompt, lambda: run_subagent(model, prompt))
except Exception as error: # isolation boundary: one bad subagent should not end the run
return f"(subagent failed: {type(error).__name__}: {error})"
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
results = list(pool.map(run_one, subtasks))
print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
verdicts = list(pool.map(run_one, verify_prompts))
joined = "\n\n".join(
f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
)
if dropped > 0:
joined = (
f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
"run; rerun them in a follow-up Workflow call)\n\n" + joined
)
return joined, Falseclass ModeAgent:
"""An agent loop whose orchestration mode is toggled with mid-conversation system messages."""
def __init__(self, model: str, mode_on: bool = True):
self.model = model
self.mode_on = mode_on
self.messages: list[dict] = []
self._mode_announced = False
self._exit_pending = False
self._turns_since_reminder = 0
def set_mode(self, mode_on: bool) -> None:
"""Turn the mode on or off. The notice is delivered with the next user turn."""
if mode_on == self.mode_on:
return
if not mode_on:
if self._mode_announced:
self._exit_pending = True
else:
self._exit_pending = False
self.mode_on = mode_on
def _due_system_messages(self) -> list[dict]:
"""System messages owed on this turn: an exit notice, the full mode text on entry,
or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
due = []
if self._exit_pending:
self._exit_pending = False
self._mode_announced = False
due.append({"role": "system", "content": MODE_EXIT})
if self.mode_on:
if not self._mode_announced:
self._mode_announced = True
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_ENTER})
elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
self._turns_since_reminder = 0
due.append({"role": "system", "content": MODE_REFRESH})
return due
def turn(self, user_input: str) -> str:
# Mid-conversation system messages follow the user turn they apply to, which keeps
# the cached prefix ahead of them untouched.
self.messages.append({"role": "user", "content": user_input})
self.messages.extend(self._due_system_messages())
self._turns_since_reminder += 1
for _ in range(MAX_MAIN_TURNS):
with client.messages.stream(
model=self.model,
max_tokens=64000,
system=SYSTEM_PROMPT, # static for the whole session
thinking={"type": "adaptive"},
output_config={"effort": EFFORT},
tools=[WORKFLOW_TOOL, BASH_TOOL],
messages=self.messages,
timeout=REQUEST_TIMEOUT_SECONDS,
) as stream:
response = stream.get_final_message()
self.messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "pause_turn":
continue
if response.stop_reason != "tool_use":
text = "".join(block.text for block in response.content if block.type == "text")
if response.stop_reason == "max_tokens":
# Drop the truncated assistant message so later turns don't build on it.
self.messages.pop()
text += "\n\n(warning: response was truncated at max_tokens)"
return text
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "Workflow":
output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
elif block.name == "bash":
output, is_error = handle_bash_block(block)
else:
output, is_error = f"unknown tool: {block.name}", True
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
self.messages.append({"role": "user", "content": tool_results})
return "(hit the main loop turn limit before finishing)"if __name__ == "__main__":
task = (
sys.argv[1]
if len(sys.argv) > 1
else "Explore the current directory, then give a thorough review: what it does, "
"code-quality issues, and concrete improvements."
)
agent = ModeAgent(MODEL)
print(agent.turn(task))
agent.set_mode(False)
print(agent.turn("Briefly summarize what you found above, no fan-out needed."))Definir herramientas, manejar llamadas a herramientas y resultados de herramientas.