• Messages
  • Agents gérés
  • Administration
Search...
⌘K
Premiers pas
Introduction à ClaudeDémarrage rapide
Développer avec Claude
Aperçu des fonctionnalitésUtilisation de l'API MessagesRaisons d'arrêt et repliRefus et repliCrédit de repli
Capacités du modèle
Réflexion étendueRéflexion adaptativeEffortBudgets de tâches (bêta)Mode rapide (aperçu de recherche)Sorties structuréesCitationsMessages en streamingTraitement par lotsRésultats de rechercheRefus en streamingPrise en charge multilingueEmbeddings
Outils
AperçuFonctionnement de l'utilisation d'outilsTutoriel : Créer un agent utilisant des outilsDéfinir des outilsGérer les appels d'outilsUtilisation d'outils en parallèleTool Runner (SDK)Utilisation d'outils stricteUtilisation d'outils avec mise en cache des promptsOutils serveurDépannageOutil de recherche webOutil de récupération webOutil d'exécution de codeOutil conseillerOutil de mémoireOutil BashOutil d'utilisation de l'ordinateurOutil d'éditeur de texte
Infrastructure des outils
Référence des outilsGérer le contexte des outilsCombinaisons d'outilsRecherche d'outilsAppel d'outils programmatiqueStreaming d'outils granulaire
Gestion du contexte
Fenêtres de contexteCompactageÉdition du contexteMise en cache des promptsMessages système en cours de conversationCréer un mode d'orchestrationDiagnostics de cache (bêta)Comptage de tokens
Travailler avec des fichiers
API FilesPrise en charge des PDFImages et vision
Compétences
AperçuDémarrage rapideBonnes pratiquesCompétences pour l'entrepriseCompétences dans l'API
MCP
Serveurs MCP distantsConnecteur MCP
Claude sur les plateformes cloud
Amazon BedrockAmazon Bedrock (ancienne version)Claude Platform sur AWSMicrosoft FoundryVertex AI
Log in
Créer un mode d'orchestration
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
Messages/Gestion du contexte

Créer un mode d'orchestration

Créez un mode au niveau de la session qui accorde un consentement permanent pour la répartition multi-agents, activé et désactivé à l'aide de messages système en cours de conversation.

Was this page helpful?

  • Configurer la boucle
  • Définir les rappels de mode
  • Accorder un consentement permanent dans la description de l'outil
  • Exécuter l'outil bash localement
  • Exécuter un sous-agent
  • Journaliser les résultats pour que les réexécutions reprennent
  • Répartir, puis vérifier
  • Basculer le mode avec des messages système en cours de conversation
  • Exécuter l'exemple
  • Vers un harnais de production
  • Voir aussi

Un mode d'orchestration est un commutateur au niveau de la session : lorsqu'il est activé, le modèle applique une exhaustivité maximale à chaque requête substantielle, en explorant d'abord la tâche elle-même, puis en répartissant le travail vers des sous-agents parallèles par défaut. Lorsqu'il est désactivé, le même outil d'orchestration revient à un fonctionnement sur demande explicite, requête par requête.

Ce mode n'est pas un paramètre d'API. Il est entièrement construit à partir d'éléments documentés :

  1. Un niveau d'effort : les requêtes s'exécutent à une valeur Effort documentée telle que xhigh. Il n'existe aucun niveau caché au-delà de ceux indiqués sur cette page.
  2. Un rappel de mode : un message système en cours de conversation indique au modèle que le mode est actif, avec un rappel d'une ligne tous les quelques tours et un avis de sortie lorsque le mode est désactivé. Le champ system de niveau supérieur ne change jamais, de sorte que le préfixe mis en cache reste intact.
  3. Un consentement permanent dans la description de l'outil : la description de l'outil d'orchestration indique que tant que le mode est activé, le modèle doit concevoir et exécuter un workflow pour chaque tâche substantielle sans demander au préalable.

Cet exemple utilise des messages système en cours de conversation, qui sont actuellement disponibles uniquement sur Claude Opus 4.8. La répartition elle-même multiplie l'utilisation de tokens : une seule requête peut engendrer de nombreuses conversations de sous-agents, réservez donc ce mode aux travaux qui en justifient le coût.

Configurer la boucle

L'exemple tient dans un seul fichier. Les constantes contrôlent le niveau d'effort, la forme de la répartition et la fréquence à laquelle le rappel de mode est renvoyé. MAX_CONCURRENT limite le nombre de sous-agents s'exécutant simultanément (le portage PHP est séquentiel et l'ignore) ; MAX_TOTAL_SUBTASKS limite le nombre que le modèle peut mettre en file d'attente dans un seul appel Workflow. Séparer les deux permet au modèle de planifier un grand backlog sans tout lancer d'un coup. La vérification DOC_TEST_MODE limite les boucles à un seul tour lorsque cette variable d'environnement est définie, afin que le harnais de documentation automatisé puisse valider que le fichier compile et se termine rapidement sans exécuter l'orchestration complète ; laissez-la non définie lorsque vous exécutez l'exemple vous-même.

Définir les rappels de mode

Les rappels sont volontairement courts. Ils basculent le mode et renvoient vers la description de l'outil, où se trouvent les instructions détaillées. Le texte complet est envoyé une fois lorsque le mode s'active, le rappel n'est renvoyé qu'après plusieurs tours utilisateur, et l'avis de sortie est envoyé une fois lorsque le mode se désactive.

Accorder un consentement permanent dans la description de l'outil

L'outil Workflow porte le véritable contrat comportemental : la règle d'activation explicite, le consentement permanent qui s'applique tant que le mode est activé, des conseils de granularité pour dimensionner la répartition, et les schémas de qualité auxquels le modèle peut recourir (une vague de vérification, un critique d'exhaustivité, un séquençage multi-phases). Les sous-agents disposent également d'un outil report_findings afin que leurs résultats reviennent sous forme de JSON structuré plutôt que de prose, et l'outil bash est l'outil bash_20250124 défini par Anthropic, exécuté localement.

Exécuter l'outil bash localement

Le gestionnaire bash exécute la commande demandée avec un délai d'expiration, capture la sortie combinée stdout et stderr, et tronque le résultat afin qu'une commande incontrôlée ne puisse pas saturer la fenêtre de contexte. Les commandes s'exécutent dans le répertoire depuis lequel vous lancez l'exemple, donc le pointer vers un projet signifie le démarrer depuis ce répertoire ; lorsque DOC_TEST_MODE est défini, le harnais fournit à la place à bash un petit répertoire de fixture jetable qui est supprimé à la sortie. Il n'y a pas de sandbox ici : la commande s'exécute avec les permissions du processus qui a lancé l'exemple. Pour plus de clarté, cet exemple exécute chaque appel dans un sous-shell neuf plutôt que de maintenir la session persistante décrite par le contrat bash_20250124 ; un agent de production devrait adosser l'outil à un shell de longue durée afin que le répertoire de travail, l'environnement et l'action restart se comportent comme documenté.

Exécuter un sous-agent

Chaque sous-tâche de workflow devient sa propre petite boucle d'agent avec l'outil bash, s'exécutant au même niveau d'effort que la boucle principale. Un délai d'expiration par requête borne chaque appel d'API afin qu'une connexion perdue dégrade un seul sous-agent au lieu de bloquer l'exécution entière.

Journaliser les résultats pour que les réexécutions reprennent

Une répartition qui engendre des dizaines de sous-agents est coûteuse à redémarrer de zéro. Un petit journal adressé par contenu la rend idempotente : avant de dispatcher un sous-agent, recherchez le SHA-256 de son prompt dans un fichier JSON local, et renvoyez le résultat enregistré s'il en existe un. Interrompez l'exécution, relancez-la, et seules les sous-tâches qui n'ont jamais abouti sont recalculées. Le journal déduplique entre les exécutions, pas au sein d'une même vague de répartition ; supprimez le fichier journal pour repartir de zéro.

Répartir, puis vérifier

La répartition accepte jusqu'à MAX_TOTAL_SUBTASKS prompts, les fait passer par le journal avec au plus MAX_CONCURRENT en cours d'exécution (séquentiel dans le portage PHP), et isole les échecs afin qu'un sous-agent défaillant se dégrade en une chaîne d'erreur au lieu de mettre fin à l'exécution. Une fois la première vague terminée, une seconde vague réutilise le même chemin de sous-agent pour tenter de réfuter chaque résultat : chaque vérificateur redérive les affirmations à partir de la source, en considérant par défaut qu'elles sont réfutées en cas d'incertitude. Le résultat original et son verdict sont tous deux renvoyés à l'orchestrateur afin qu'il puisse les pondérer ensemble.

Basculer le mode avec des messages système en cours de conversation

L'agent ajoute d'abord le message de l'utilisateur, puis tout message système attendu : l'avis de sortie, le texte complet du mode à l'entrée, ou le rappel périodique. Placer le message système après le tour utilisateur laisse intact chaque octet mis en cache qui le précède, et satisfait la règle de placement selon laquelle un message système suit un tour utilisateur.

Exécuter l'exemple

L'outil bash de cet exemple exécute des commandes écrites par le modèle directement sur votre machine sans aucun sandbox, et la répartition exécute plusieurs de ces agents en parallèle. Exécutez-le dans un répertoire et un environnement que vous êtes à l'aise d'exposer, et ajoutez un sandboxing avant de l'adapter à quoi que ce soit au-delà de l'expérimentation locale.

Démarrez l'exemple depuis le répertoire dans lequel vous souhaitez que les agents travaillent, par exemple la racine d'un dépôt à examiner :

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

Avec le mode activé, attendez-vous à ce que le modèle explore avec quelques commandes bash, dispatche l'outil Workflow sans y être invité, et synthétise les rapports des sous-agents en une réponse finale. Les requêtes triviales ou conversationnelles restent traitées en solo, comme l'indique le rappel.

Vers un harnais de production

Cet exemple est délibérément minimal. Un harnais destiné à des charges de travail réelles ajouterait généralement :

  • Des scripts d'orchestration en sandbox : laisser le modèle émettre un court programme d'orchestration (branchements, boucles et étapes de réduction) et l'exécuter dans un interpréteur isolé, plutôt que d'accepter uniquement une liste plate de chaînes de sous-tâches.
  • Une journalisation durable : remplacer le fichier JSON local par un stockage qui survit aux redémarrages de processus et qui est sûr en cas d'écritures concurrentes sur plusieurs machines.
  • Une application de budget : suivre le nombre total de sous-agents lancés sur l'ensemble de la session, pas seulement par appel Workflow, et refuser de dépasser un plafond strict afin qu'un plan incontrôlé ne puisse pas épuiser votre quota.

Les schémas de cet exemple (les rappels de mode, le consentement permanent dans la description de l'outil, la journalisation et une vague de vérification) se transposent tels quels ; seul le substrat d'exécution qui les entoure devient plus robuste.

Voir aussi

Messages système en cours de conversation

Le mécanisme utilisé par les rappels de mode, et son interaction avec la mise en cache des prompts.

Effort

Les niveaux d'effort acceptés par l'API et comment en choisir un.

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"
MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Utilisation d'outils avec Claude

Définir des outils, gérer les appels d'outils et les résultats d'outils.

Outil bash

L'outil bash défini par Anthropic que cet exemple exécute localement.