• Messaggi
  • Agenti gestiti
  • Amministrazione
Search...
⌘K
Primi passi
Introduzione a ClaudeGuida rapida
Sviluppare con Claude
Panoramica delle funzionalitàUtilizzo dell'API MessagesMotivi di interruzione e fallbackRifiuti e fallbackCredito di fallback
Capacità del modello
Pensiero estesoPensiero adattivoSforzoBudget delle attività (beta)Modalità veloce (anteprima di ricerca)Output strutturatiCitazioniStreaming dei messaggiElaborazione batchRisultati di ricercaStreaming dei rifiutiSupporto multilingueEmbedding
Strumenti
PanoramicaCome funziona l'uso degli strumentiTutorial: Creare un agente che usa strumentiDefinire gli strumentiGestire le chiamate agli strumentiUso degli strumenti in paralleloTool Runner (SDK)Uso degli strumenti rigorosoUso degli strumenti con cache dei promptStrumenti serverRisoluzione dei problemiStrumento di ricerca webStrumento di recupero webStrumento di esecuzione del codiceStrumento consulenteStrumento di memoriaStrumento BashStrumento di uso del computerStrumento editor di testo
Infrastruttura degli strumenti
Riferimento degli strumentiGestire il contesto degli strumentiCombinazioni di strumentiRicerca di strumentiChiamata programmatica degli strumentiStreaming granulare degli strumenti
Gestione del contesto
Finestre di contestoCompattazioneModifica del contestoCache dei promptMessaggi di sistema a metà conversazioneCreare una modalità di orchestrazioneDiagnostica della cache (beta)Conteggio dei token
Lavorare con i file
API FilesSupporto PDFImmagini e visione
Skill
PanoramicaGuida rapidaBest practiceSkill per le aziendeSkill nell'API
MCP
Server MCP remotiConnettore MCP
Claude su piattaforme cloud
Amazon BedrockAmazon Bedrock (legacy)Claude Platform su AWSMicrosoft FoundryVertex AI
Log in
Creare una modalità di orchestrazione
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
Messaggi/Gestione del contesto

Creare una modalità di orchestrazione

Crea una modalità a livello di sessione che concede un consenso permanente per il fan-out multi-agente, attivata e disattivata tramite messaggi di sistema a metà conversazione.

Was this page helpful?

  • Configurare il loop
  • Definire i promemoria di modalità
  • Concedere il consenso permanente nella descrizione dello strumento
  • Eseguire lo strumento bash localmente
  • Eseguire un singolo subagente
  • Registrare i risultati in un journal per riprendere le riesecuzioni
  • Distribuire, poi verificare
  • Attivare e disattivare la modalità con messaggi di sistema a metà conversazione
  • Eseguirlo
  • Verso un harness di produzione
  • Correlati

Una modalità di orchestrazione è un interruttore a livello di sessione: quando è attiva, il modello applica la massima accuratezza a ogni richiesta sostanziale, esplorando prima il compito stesso e poi distribuendo il lavoro a subagenti paralleli per impostazione predefinita. Quando è disattivata, lo stesso strumento di orchestrazione torna a richiedere un'adesione esplicita per ogni singola richiesta.

La modalità non è un parametro dell'API. È costruita interamente a partire da componenti documentati:

  1. Un livello di effort: le richieste vengono eseguite a un valore di Effort documentato, come xhigh. Non esiste alcun livello nascosto oltre a quelli indicati in quella pagina.
  2. Un promemoria di modalità: un messaggio di sistema a metà conversazione comunica al modello che la modalità è attiva, con un breve promemoria di una riga ogni alcuni turni e un avviso di uscita quando la modalità viene disattivata. Il campo system di primo livello non cambia mai, quindi il prefisso memorizzato nella cache rimane intatto.
  3. Consenso permanente nella descrizione dello strumento: la descrizione dello strumento di orchestrazione specifica che, mentre la modalità è attiva, il modello deve creare ed eseguire un workflow per ogni compito sostanziale senza chiedere prima.

Questo esempio utilizza messaggi di sistema a metà conversazione, attualmente disponibili solo su Claude Opus 4.8. Il "fan-out" (distribuzione parallela) stesso moltiplica l'utilizzo di token: una singola richiesta può generare molte conversazioni di subagenti, quindi riserva questa modalità a lavori che ne giustifichino il costo.

Configurare il loop

L'esempio è un singolo file. Le costanti controllano il livello di effort, la forma del fan-out e la frequenza con cui viene reinviato il promemoria di modalità. MAX_CONCURRENT limita il numero di subagenti eseguiti contemporaneamente (il port PHP è sequenziale e lo ignora); MAX_TOTAL_SUBTASKS limita il numero di subagenti che il modello può mettere in coda in una singola chiamata Workflow. Separare i due valori consente al modello di pianificare un ampio backlog senza avviarlo tutto in una volta. Il controllo DOC_TEST_MODE limita i loop a un singolo turno quando quella variabile d'ambiente è impostata, in modo che l'harness automatizzato della documentazione possa verificare che il file compili e termini rapidamente senza eseguire l'orchestrazione completa; lasciala non impostata quando esegui l'esempio autonomamente.

Definire i promemoria di modalità

I promemoria sono volutamente brevi. Attivano o disattivano la modalità e rimandano alla descrizione dello strumento, dove risiedono le istruzioni più corpose. Il testo completo viene inviato una volta quando la modalità si attiva, il promemoria viene reinviato solo dopo diversi turni dell'utente e l'avviso di uscita viene inviato una volta quando la modalità si disattiva.

Concedere il consenso permanente nella descrizione dello strumento

Lo strumento Workflow contiene il vero contratto comportamentale: la regola di opt-in, il consenso permanente che si applica mentre la modalità è attiva, le indicazioni sulla granularità per dimensionare il fan-out e i pattern di qualità a cui il modello può ricorrere (un'ondata di verifica, un critico di completezza, sequenziamento multi-fase). I subagenti ricevono anche uno strumento report_findings in modo che i loro risultati tornino come JSON strutturato anziché come prosa, e lo strumento bash è lo strumento bash_20250124 definito da Anthropic eseguito localmente.

Eseguire lo strumento bash localmente

Il gestore bash esegue il comando richiesto con un timeout, cattura stdout e stderr combinati e tronca il risultato in modo che un comando fuori controllo non possa saturare la finestra di contesto. I comandi vengono eseguiti nella directory da cui avvii l'esempio, quindi per puntarlo a un progetto devi avviarlo da lì; quando DOC_TEST_MODE è impostata, l'harness fornisce invece a bash una piccola directory fixture usa e getta che viene rimossa all'uscita. Qui non c'è alcuna sandbox: il comando viene eseguito con i permessi del processo che ha avviato l'esempio. Per chiarezza, questo esempio esegue ogni chiamata in una subshell nuova anziché mantenere la sessione persistente descritta dal contratto bash_20250124; un agente di produzione dovrebbe supportare lo strumento con una shell a lunga durata in modo che la directory di lavoro, l'ambiente e l'azione restart si comportino come documentato.

Eseguire un singolo subagente

Ogni sottocompito del workflow diventa un proprio piccolo loop agente con lo strumento bash, eseguito allo stesso livello di effort del loop principale. Un timeout per richiesta limita ogni chiamata API in modo che una connessione interrotta degradi un singolo subagente invece di bloccare l'intera esecuzione.

Registrare i risultati in un journal per riprendere le riesecuzioni

Un fan-out che genera decine di subagenti è costoso da riavviare da zero. Un piccolo journal indirizzato per contenuto lo rende idempotente: prima di inviare un subagente, cerca lo SHA-256 del suo prompt in un file JSON locale e restituisce il risultato registrato se ne esiste uno. Interrompi l'esecuzione, rieseguila, e solo i sottocompiti che non sono mai terminati vengono ricalcolati. Il journal deduplica tra esecuzioni diverse, non all'interno di una singola ondata di fan-out; elimina il file journal per ricominciare da capo.

Distribuire, poi verificare

Il fan-out accetta fino a MAX_TOTAL_SUBTASKS prompt, li esegue attraverso il journal con al massimo MAX_CONCURRENT in esecuzione simultanea (sequenziale nel port PHP) e isola i fallimenti in modo che un subagente difettoso degradi a una stringa di errore invece di terminare l'esecuzione. Una volta terminata la prima ondata, una seconda ondata riutilizza lo stesso percorso dei subagenti per tentare di confutare ogni risultato: ogni verificatore rideriva le affermazioni dalla fonte, considerandole confutate per impostazione predefinita in caso di incertezza. Sia il risultato originale che il relativo verdetto vengono restituiti all'orchestratore in modo che possa valutarli insieme.

Attivare e disattivare la modalità con messaggi di sistema a metà conversazione

L'agente aggiunge prima il messaggio dell'utente, poi eventuali messaggi di sistema dovuti: l'avviso di uscita, il testo completo della modalità all'ingresso o il promemoria periodico. Posizionare il messaggio di sistema dopo il turno dell'utente mantiene intatto ogni byte memorizzato nella cache che lo precede e soddisfa la regola di posizionamento secondo cui un messaggio di sistema segue un turno dell'utente.

Eseguirlo

Lo strumento bash in questo esempio esegue comandi scritti dal modello direttamente sulla tua macchina senza alcuna sandbox, e il fan-out esegue diversi di questi agenti in parallelo. Eseguilo in una directory e in un ambiente che sei disposto a esporre, e aggiungi il sandboxing prima di adattarlo a qualsiasi uso che vada oltre la sperimentazione locale.

Avvia l'esempio dalla directory in cui vuoi che gli agenti lavorino, ad esempio la root di un repository da esaminare:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

Con la modalità attiva, aspettati che il modello esplori con alcuni comandi bash, invochi lo strumento Workflow senza che gli venga richiesto e sintetizzi i report dei subagenti in una risposta finale. Le richieste banali o conversazionali rimangono gestite in autonomia, come indicato dal promemoria.

Verso un harness di produzione

Questo esempio è volutamente ridotto. Un harness pensato per carichi di lavoro reali aggiungerebbe tipicamente:

  • Script di orchestrazione in sandbox: consenti al modello di emettere un breve programma di orchestrazione (con ramificazioni, loop e passaggi di riduzione) ed eseguilo all'interno di un interprete isolato, anziché accettare solo un elenco piatto di stringhe di sottocompiti.
  • Journaling durevole: sostituisci il file JSON locale con uno store che sopravviva ai riavvii del processo e sia sicuro in presenza di scritture concorrenti su più macchine.
  • Applicazione del budget: tieni traccia del totale dei subagenti avviati nell'intera sessione, non solo per singola chiamata Workflow, e rifiuta di superare un limite rigido in modo che un piano fuori controllo non possa esaurire la tua quota.

I pattern di questo esempio (i promemoria di modalità, il consenso permanente nella descrizione dello strumento, il journaling e l'ondata di verifica) si trasferiscono invariati; solo il substrato di esecuzione che li circonda diventa più robusto.

Correlati

Messaggi di sistema a metà conversazione

Il meccanismo utilizzato dai promemoria di modalità e come interagisce con la cache dei prompt.

Effort

I livelli di effort accettati dall'API e come sceglierne uno.

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"
MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Uso degli strumenti con Claude

Definire strumenti, gestire le chiamate agli strumenti e i risultati degli strumenti.

Strumento bash

Lo strumento bash definito da Anthropic che questo esempio esegue localmente.