• Mensajes
  • Agentes gestionados
  • Administración
Search...
⌘K
Primeros pasos
Introducción a ClaudeInicio rápido
Desarrollar con Claude
Descripción general de funcionesUso de la API de MensajesMotivos de detención y respaldoRechazos y respaldoCrédito de respaldo
Capacidades del modelo
Pensamiento extendidoPensamiento adaptativoEsfuerzoPresupuestos de tareas (beta)Modo rápido (vista previa de investigación)Salidas estructuradasCitasStreaming de mensajesProcesamiento por lotesResultados de búsquedaStreaming de rechazosSoporte multilingüeEmbeddings
Herramientas
Descripción generalCómo funciona el uso de herramientasTutorial: Crear un agente que usa herramientasDefinir herramientasGestionar llamadas a herramientasUso de herramientas en paraleloTool Runner (SDK)Uso de herramientas estrictoUso de herramientas con almacenamiento en caché de promptsHerramientas de servidorSolución de problemasHerramienta de búsqueda webHerramienta de obtención webHerramienta de ejecución de códigoHerramienta de asesorHerramienta de memoriaHerramienta BashHerramienta de uso de computadoraHerramienta de editor de texto
Infraestructura de herramientas
Referencia de herramientasGestionar contexto de herramientasCombinaciones de herramientasBúsqueda de herramientasLlamadas programáticas a herramientasStreaming detallado de herramientas
Gestión de contexto
Ventanas de contextoCompactaciónEdición de contextoAlmacenamiento en caché de promptsMensajes del sistema en mitad de conversaciónCrear un modo de orquestaciónDiagnóstico de caché (beta)Conteo de tokens
Trabajar con archivos
API de archivosCompatibilidad con PDFImágenes y visión
Habilidades
Descripción generalInicio rápidoMejores prácticasHabilidades para empresasHabilidades en la API
MCP
Servidores MCP remotosConector MCP
Claude en plataformas en la nube
Amazon BedrockAmazon Bedrock (heredado)Claude Platform en AWSMicrosoft FoundryVertex AI
Log in
Crear un modo de orquestación
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
Mensajes/Gestión de contexto

Construir un modo de orquestación

Construye un modo a nivel de sesión que otorga consentimiento permanente para la distribución en paralelo a múltiples agentes, activado y desactivado con mensajes del sistema a mitad de conversación.

Was this page helpful?

  • Configurar el bucle
  • Definir los recordatorios de modo
  • Otorgar consentimiento permanente en la descripción de la herramienta
  • Ejecutar la herramienta bash localmente
  • Ejecutar un subagente
  • Registrar resultados para que las reejecuciones se reanuden
  • Distribuir en paralelo, luego verificar
  • Alternar el modo con mensajes del sistema a mitad de conversación
  • Ejecutarlo
  • Hacia un sistema de producción
  • Relacionado

Un modo de orquestación es un interruptor a nivel de sesión: cuando está activado, el modelo aplica la máxima exhaustividad a cada solicitud sustantiva, explorando primero la tarea por sí mismo y luego distribuyendo el trabajo a subagentes paralelos de forma predeterminada. Cuando está desactivado, la misma herramienta de orquestación vuelve a requerir activación explícita por solicitud.

El modo no es un parámetro de la API. Se construye completamente a partir de piezas documentadas:

  1. Un nivel de esfuerzo: las solicitudes se ejecutan con un valor documentado de Effort como xhigh. No existe ningún nivel oculto por encima de los que aparecen en esa página.
  2. Un recordatorio de modo: un mensaje del sistema a mitad de conversación le indica al modelo que el modo está activo, con un recordatorio de una línea cada varios turnos y un aviso de salida cuando el modo se desactiva. El campo system de nivel superior nunca cambia, por lo que el prefijo almacenado en caché permanece intacto.
  3. Consentimiento permanente en la descripción de la herramienta: la descripción de la herramienta de orquestación indica que, mientras el modo esté activado, el modelo debe crear y ejecutar un flujo de trabajo para cada tarea sustantiva sin preguntar primero.

Este ejemplo usa mensajes del sistema a mitad de conversación, que actualmente solo están disponibles en Claude Opus 4.8. La distribución en paralelo en sí multiplica el uso de tokens: una sola solicitud puede generar muchas conversaciones de subagentes, así que reserva el modo para trabajo que justifique el costo.

Configurar el bucle

El ejemplo es un solo archivo. Las constantes controlan el nivel de esfuerzo, la forma de la distribución en paralelo y la frecuencia con la que se reenvía el recordatorio de modo. MAX_CONCURRENT limita cuántos subagentes se ejecutan al mismo tiempo (la versión de PHP es secuencial y lo ignora); MAX_TOTAL_SUBTASKS limita cuántos puede poner en cola el modelo en una sola llamada a Workflow. Separar ambos permite que el modelo planifique una lista de tareas grande sin lanzarla toda de una vez. La comprobación de DOC_TEST_MODE limita los bucles a un solo turno cuando esa variable de entorno está definida, de modo que el sistema automatizado de pruebas de documentación pueda validar que el archivo compila y termina rápidamente sin ejecutar la orquestación completa; déjala sin definir cuando ejecutes el ejemplo tú mismo.

Definir los recordatorios de modo

Los recordatorios son cortos a propósito. Activan o desactivan el modo y apuntan a la descripción de la herramienta, donde residen las instrucciones detalladas. El texto completo se envía una vez cuando el modo se activa, el recordatorio se reenvía solo después de varios turnos del usuario, y el aviso de salida se envía una vez cuando el modo se desactiva.

Otorgar consentimiento permanente en la descripción de la herramienta

La herramienta Workflow contiene el verdadero contrato de comportamiento: la regla de activación explícita, el consentimiento permanente que aplica mientras el modo está activado, orientación sobre granularidad para dimensionar la distribución en paralelo, y los patrones de calidad a los que el modelo puede recurrir (una ronda de verificación, un crítico de completitud, secuenciación multifase). Los subagentes también reciben una herramienta report_findings para que sus resultados regresen como JSON estructurado en lugar de prosa, y la herramienta bash es la herramienta bash_20250124 definida por Anthropic ejecutada localmente.

Ejecutar la herramienta bash localmente

El manejador de bash ejecuta el comando solicitado con un tiempo límite, captura stdout y stderr combinados, y trunca el resultado para que un comando descontrolado no inunde la ventana de contexto. Los comandos se ejecutan en el directorio desde el que inicias el ejemplo, así que apuntarlo a un proyecto significa iniciarlo allí; cuando DOC_TEST_MODE está definida, el sistema de pruebas en su lugar le da a bash un pequeño directorio de prueba desechable que se elimina al salir. Aquí no hay sandbox: el comando se ejecuta con los permisos del proceso que inició el ejemplo. Para mayor claridad, este ejemplo ejecuta cada llamada en un subshell nuevo en lugar de mantener la sesión persistente que describe el contrato de bash_20250124; un agente de producción debería respaldar la herramienta con un shell de larga duración para que el directorio de trabajo, el entorno y la acción restart se comporten como está documentado.

Ejecutar un subagente

Cada subtarea del flujo de trabajo se convierte en su propio pequeño bucle de agente con la herramienta bash, ejecutándose con el mismo esfuerzo que el bucle principal. Un tiempo límite por solicitud acota cada llamada a la API para que una conexión perdida degrade un subagente en lugar de bloquear toda la ejecución.

Registrar resultados para que las reejecuciones se reanuden

Una distribución en paralelo que genera docenas de subagentes es costosa de reiniciar desde cero. Un pequeño registro direccionado por contenido la hace idempotente: antes de despachar un subagente, busca el SHA-256 de su prompt en un archivo JSON local y devuelve el resultado registrado si existe uno. Interrumpe la ejecución, vuelve a ejecutarla, y solo las subtareas que nunca terminaron se recalculan. El registro deduplica entre ejecuciones, no dentro de una sola ronda de distribución; elimina el archivo de registro para empezar de cero.

Distribuir en paralelo, luego verificar

La distribución en paralelo acepta hasta MAX_TOTAL_SUBTASKS prompts, los ejecuta a través del registro con un máximo de MAX_CONCURRENT en curso (secuencial en la versión de PHP), y aísla los fallos para que un subagente roto se degrade a una cadena de error en lugar de terminar la ejecución. Una vez que termina la primera ronda, una segunda ronda reutiliza la misma ruta de subagente para intentar refutar cada resultado: cada verificador vuelve a derivar las afirmaciones desde la fuente, marcándolas como refutadas por defecto cuando hay incertidumbre. Tanto el resultado original como su veredicto se devuelven al orquestador para que pueda ponderarlos juntos.

Alternar el modo con mensajes del sistema a mitad de conversación

El agente agrega primero el mensaje del usuario, luego cualquier mensaje del sistema que corresponda: el aviso de salida, el texto completo del modo al entrar, o el recordatorio periódico. Colocar el mensaje del sistema después del turno del usuario mantiene intacto cada byte almacenado en caché que lo precede, y satisface la regla de ubicación de que un mensaje del sistema sigue a un turno del usuario.

Ejecutarlo

La herramienta bash en este ejemplo ejecuta comandos escritos por el modelo directamente en tu máquina sin ningún sandbox, y la distribución en paralelo ejecuta varios de esos agentes simultáneamente. Ejecútalo en un directorio y entorno que te sientas cómodo exponiendo, y agrega aislamiento (sandboxing) antes de adaptarlo para cualquier cosa más allá de la experimentación local.

Inicia el ejemplo desde el directorio en el que quieres que trabajen los agentes, por ejemplo la raíz de un repositorio a revisar:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

Con el modo activado, espera que el modelo explore con algunos comandos bash, despache la herramienta Workflow sin que se lo pidan, y sintetice los informes de los subagentes en una respuesta final. Las solicitudes triviales o conversacionales permanecen en solitario, como indica el recordatorio.

Hacia un sistema de producción

Este ejemplo es deliberadamente pequeño. Un sistema pensado para cargas de trabajo reales típicamente agregaría:

  • Scripts de orquestación en sandbox: permitir que el modelo emita un programa de orquestación corto (ramificación, bucles y pasos de reducción) y ejecutarlo dentro de un intérprete aislado, en lugar de aceptar solo una lista plana de cadenas de subtareas.
  • Registro duradero: reemplazar el archivo JSON local con un almacén que sobreviva a reinicios del proceso y sea seguro bajo escrituras concurrentes entre máquinas.
  • Control de presupuesto: rastrear el total de subagentes lanzados en toda la sesión, no solo por llamada a Workflow, y rechazar exceder un límite estricto para que un plan descontrolado no pueda agotar tu cuota.

Los patrones de este ejemplo (los recordatorios de modo, el consentimiento permanente en la descripción de la herramienta, el registro y una ronda de verificación) se trasladan sin cambios; solo el sustrato de ejecución a su alrededor se vuelve más robusto.

Relacionado

Mensajes del sistema a mitad de conversación

El mecanismo que usan los recordatorios de modo y cómo interactúa con el almacenamiento en caché de prompts.

Effort

Los niveles de esfuerzo que acepta la API y cómo elegir uno.

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"
MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Uso de herramientas con Claude

Definir herramientas, manejar llamadas a herramientas y resultados de herramientas.

Herramienta bash

La herramienta bash definida por Anthropic que este ejemplo ejecuta localmente.