• Nachrichten
  • Managed Agents
  • Admin
Search...
⌘K
Erste Schritte
Einführung in ClaudeSchnellstart
Entwickeln mit Claude
FunktionsübersichtVerwendung der Messages APIStoppgründe und FallbackAblehnungen und FallbackFallback-Guthaben
Modellfähigkeiten
Erweitertes DenkenAdaptives DenkenAufwandAufgabenbudgets (Beta)Schnellmodus (Forschungsvorschau)Strukturierte AusgabenZitateStreaming von NachrichtenBatch-VerarbeitungSuchergebnisseStreaming von AblehnungenMehrsprachige UnterstützungEmbeddings
Tools
ÜbersichtWie Tool-Nutzung funktioniertTutorial: Einen Tool-nutzenden Agenten erstellenTools definierenTool-Aufrufe verarbeitenParallele Tool-NutzungTool Runner (SDK)Strikte Tool-NutzungTool-Nutzung mit Prompt-CachingServer-ToolsFehlerbehebungWebsuche-ToolWeb-Fetch-ToolCodeausführungs-ToolAdvisor-ToolMemory-ToolBash-ToolComputer-Use-ToolTexteditor-Tool
Tool-Infrastruktur
Tool-ReferenzTool-Kontext verwaltenTool-KombinationenTool-SucheProgrammatischer Tool-AufrufFeingranulares Tool-Streaming
Kontextverwaltung
KontextfensterKompaktierungKontextbearbeitungPrompt-CachingSystemnachrichten während der KonversationEinen Orchestrierungsmodus erstellenCache-Diagnose (Beta)Token-Zählung
Arbeiten mit Dateien
Files APIPDF-UnterstützungBilder und Vision
Skills
ÜbersichtSchnellstartBest PracticesSkills für UnternehmenSkills in der API
MCP
Remote-MCP-ServerMCP-Connector
Claude auf Cloud-Plattformen
Amazon BedrockAmazon Bedrock (Legacy)Claude Platform auf AWSMicrosoft FoundryVertex AI
Log in
Einen Orchestrierungsmodus erstellen
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
Nachrichten/Kontextverwaltung

Einen Orchestrierungsmodus erstellen

Erstelle einen Modus auf Sitzungsebene, der eine dauerhafte Zustimmung für Multi-Agent-Fan-out erteilt und mit Systemnachrichten mitten in der Konversation ein- und ausgeschaltet wird.

Was this page helpful?

  • Die Schleife einrichten
  • Die Modus-Erinnerungen definieren
  • Dauerhafte Zustimmung in der Tool-Beschreibung erteilen
  • Das Bash-Tool lokal ausführen
  • Einen Subagenten ausführen
  • Ergebnisse protokollieren, damit Wiederholungen fortgesetzt werden
  • Auffächern, dann verifizieren
  • Den Modus mit Systemnachrichten mitten in der Konversation umschalten
  • Ausführen
  • Auf dem Weg zu einem Produktions-Harness
  • Verwandte Themen

Ein Orchestrierungsmodus ist ein Schalter auf Sitzungsebene: Wenn er eingeschaltet ist, setzt das Modell maximale Gründlichkeit hinter jede substanzielle Anfrage, erkundet die Aufgabe selbst und verteilt die Arbeit dann standardmäßig per „fan-out" (Auffächerung) an parallele Subagenten. Wenn er ausgeschaltet ist, kehrt dasselbe Orchestrierungs-Tool zum Opt-in pro Anfrage zurück.

Der Modus ist kein API-Parameter. Er ist vollständig aus dokumentierten Bausteinen zusammengesetzt:

  1. Eine Effort-Stufe: Anfragen laufen mit einem dokumentierten Effort-Wert wie xhigh. Es gibt keine versteckte Stufe über den auf dieser Seite aufgeführten.
  2. Eine Modus-Erinnerung: Eine Systemnachricht mitten in der Konversation teilt dem Modell mit, dass der Modus aktiv ist, mit einer einzeiligen Auffrischung alle paar Turns und einer Ausstiegsnachricht, wenn der Modus ausgeschaltet wird. Das system-Feld auf oberster Ebene ändert sich nie, sodass das gecachte Präfix intakt bleibt.
  3. Dauerhafte Zustimmung in der Tool-Beschreibung: Die Beschreibung des Orchestrierungs-Tools besagt, dass das Modell, solange der Modus aktiv ist, für jede substanzielle Aufgabe einen Workflow erstellen und ausführen soll, ohne vorher zu fragen.

Dieses Beispiel verwendet Systemnachrichten mitten in der Konversation, die derzeit nur auf Claude Opus 4.8 verfügbar sind. Das Fan-out selbst vervielfacht den Token-Verbrauch: Eine einzelne Anfrage kann viele Subagenten-Konversationen auslösen, reserviere den Modus also für Arbeit, die die Kosten rechtfertigt.

Die Schleife einrichten

Das Beispiel ist eine einzelne Datei. Die Konstanten steuern die Effort-Stufe, die Form des Fan-outs und wie oft die Modus-Auffrischung erneut gesendet wird. MAX_CONCURRENT begrenzt, wie viele Subagenten gleichzeitig laufen (die PHP-Portierung ist sequenziell und ignoriert diesen Wert); MAX_TOTAL_SUBTASKS begrenzt, wie viele das Modell in einem einzelnen Workflow-Aufruf in die Warteschlange stellen darf. Die Trennung der beiden ermöglicht es dem Modell, einen großen Backlog zu planen, ohne alles auf einmal zu starten. Die DOC_TEST_MODE-Prüfung begrenzt die Schleifen auf einen einzelnen Turn, wenn diese Umgebungsvariable gesetzt ist, damit das automatisierte Docs-Harness validieren kann, dass die Datei kompiliert und schnell abschließt, ohne die vollständige Orchestrierung auszuführen; lass sie ungesetzt, wenn du das Beispiel selbst ausführst.

Die Modus-Erinnerungen definieren

Die Erinnerungen sind absichtlich kurz. Sie schalten den Modus um und verweisen auf die Tool-Beschreibung, in der die umfangreichen Anweisungen stehen. Der vollständige Text wird einmal gesendet, wenn der Modus eingeschaltet wird, die Auffrischung wird erst nach mehreren User-Turns erneut gesendet, und die Ausstiegsnachricht wird einmal gesendet, wenn der Modus ausgeschaltet wird.

Dauerhafte Zustimmung in der Tool-Beschreibung erteilen

Das Workflow-Tool trägt den eigentlichen Verhaltensvertrag: die Opt-in-Regel, die dauerhafte Zustimmung, die gilt, solange der Modus aktiv ist, Granularitätshinweise zur Dimensionierung des Fan-outs und die Qualitätsmuster, auf die das Modell zurückgreifen kann (eine Verifikationswelle, ein Vollständigkeitskritiker, mehrphasige Sequenzierung). Subagenten erhalten außerdem ein report_findings-Tool, damit ihre Ergebnisse als strukturiertes JSON statt als Prosa zurückkommen, und das Bash-Tool ist das von Anthropic definierte bash_20250124-Tool, das lokal ausgeführt wird.

Das Bash-Tool lokal ausführen

Der Bash-Handler führt den angeforderten Befehl mit einem Timeout aus, erfasst kombiniertes stdout und stderr und kürzt das Ergebnis, damit ein außer Kontrolle geratener Befehl das Kontextfenster nicht überfluten kann. Befehle laufen in dem Verzeichnis, aus dem du das Beispiel startest, also bedeutet das Ausrichten auf ein Projekt, es dort zu starten; wenn DOC_TEST_MODE gesetzt ist, gibt das Harness Bash stattdessen ein kleines Wegwerf-Fixture-Verzeichnis, das beim Beenden entfernt wird. Es gibt hier keine Sandbox: Der Befehl läuft mit den Berechtigungen des Prozesses, der das Beispiel gestartet hat. Der Klarheit halber führt dieses Beispiel jeden Aufruf in einer frischen Subshell aus, anstatt die persistente Sitzung aufrechtzuerhalten, die der bash_20250124-Vertrag beschreibt; ein Produktions-Agent sollte das Tool mit einer langlebigen Shell hinterlegen, damit Arbeitsverzeichnis, Umgebung und die restart-Aktion sich wie dokumentiert verhalten.

Einen Subagenten ausführen

Jede Workflow-Teilaufgabe wird zu einer eigenen kleinen Agentenschleife mit dem Bash-Tool, die mit demselben Effort wie die Hauptschleife läuft. Ein Timeout pro Anfrage begrenzt jeden API-Aufruf, sodass eine abgebrochene Verbindung einen Subagenten beeinträchtigt, anstatt den gesamten Lauf zu blockieren.

Ergebnisse protokollieren, damit Wiederholungen fortgesetzt werden

Ein Fan-out, das Dutzende von Subagenten startet, ist teuer, wenn man es von Grund auf neu starten muss. Ein kleines inhaltsadressiertes Journal macht es idempotent: Bevor ein Subagent gestartet wird, schlage den SHA-256 seines Prompts in einer lokalen JSON-Datei nach und gib das aufgezeichnete Ergebnis zurück, falls eines existiert. Unterbrich den Lauf, führe ihn erneut aus, und nur die Teilaufgaben, die nie abgeschlossen wurden, werden neu berechnet. Das Journal dedupliziert über Läufe hinweg, nicht innerhalb einer einzelnen Fan-out-Welle; lösche die Journal-Datei, um neu zu beginnen.

Auffächern, dann verifizieren

Das Fan-out akzeptiert bis zu MAX_TOTAL_SUBTASKS Prompts, lässt sie durch das Journal laufen mit höchstens MAX_CONCURRENT gleichzeitig in Bearbeitung (sequenziell in der PHP-Portierung) und isoliert Fehler, sodass ein defekter Subagent zu einem Fehler-String degradiert, anstatt den Lauf zu beenden. Sobald die erste Welle abgeschlossen ist, verwendet eine zweite Welle denselben Subagenten-Pfad, um zu versuchen, jedes Ergebnis zu widerlegen: Jeder Verifizierer leitet die Behauptungen erneut aus der Quelle ab und fällt bei Unsicherheit standardmäßig auf „widerlegt" zurück. Sowohl das ursprüngliche Ergebnis als auch sein Urteil werden an den Orchestrator zurückgegeben, damit er sie zusammen abwägen kann.

Den Modus mit Systemnachrichten mitten in der Konversation umschalten

Der Agent hängt zuerst die Nachricht des Users an, dann alle fälligen Systemnachrichten: die Ausstiegsnachricht, den vollständigen Modus-Text beim Eintritt oder die periodische Auffrischung. Das Platzieren der Systemnachricht nach dem User-Turn lässt jedes gecachte Byte davor unberührt und erfüllt die Platzierungsregel, dass eine Systemnachricht auf einen User-Turn folgt.

Ausführen

Das Bash-Tool in diesem Beispiel führt vom Modell geschriebene Befehle direkt auf deinem Rechner ohne Sandbox aus, und das Fan-out lässt mehrere dieser Agenten parallel laufen. Führe es in einem Verzeichnis und einer Umgebung aus, die du bedenkenlos freigeben kannst, und füge Sandboxing hinzu, bevor du es für etwas anderes als lokales Experimentieren anpasst.

Starte das Beispiel aus dem Verzeichnis, in dem die Agenten arbeiten sollen, zum Beispiel dem Root eines Repositorys, das überprüft werden soll:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

Mit eingeschaltetem Modus kannst du erwarten, dass das Modell mit ein paar Bash-Befehlen erkundet, das Workflow-Tool unaufgefordert aufruft und die Subagenten-Berichte zu einer finalen Antwort zusammenfasst. Triviale oder konversationelle Anfragen bleiben solo, wie die Erinnerung anweist.

Auf dem Weg zu einem Produktions-Harness

Dieses Beispiel ist absichtlich klein gehalten. Ein Harness, das für echte Workloads gedacht ist, würde typischerweise Folgendes hinzufügen:

  • Sandboxed Orchestrierungsskripte: Lass das Modell ein kurzes Orchestrierungsprogramm ausgeben (Verzweigungen, Schleifen und Reduce-Schritte) und führe es in einem isolierten Interpreter aus, anstatt nur eine flache Liste von Teilaufgaben-Strings zu akzeptieren.
  • Dauerhaftes Journaling: Ersetze die lokale JSON-Datei durch einen Speicher, der Prozessneustarts überlebt und bei gleichzeitigen Schreibzugriffen über mehrere Maschinen hinweg sicher ist.
  • Budget-Durchsetzung: Verfolge die Gesamtzahl der gestarteten Subagenten über die gesamte Sitzung hinweg, nicht nur pro Workflow-Aufruf, und verweigere das Überschreiten einer harten Obergrenze, damit ein außer Kontrolle geratener Plan dein Kontingent nicht erschöpfen kann.

Die Muster in diesem Beispiel (die Modus-Erinnerungen, dauerhafte Zustimmung in der Tool-Beschreibung, Journaling und eine Verifikationswelle) lassen sich unverändert übertragen; nur das Ausführungssubstrat um sie herum wird robuster.

Verwandte Themen

Systemnachrichten mitten in der Konversation

Der Mechanismus, den die Modus-Erinnerungen verwenden, und wie er mit Prompt-Caching interagiert.

Effort

Die Effort-Stufen, die die API akzeptiert, und wie du eine auswählst.

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"
MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Tool-Nutzung mit Claude

Tools definieren, Tool-Aufrufe handhaben und Tool-Ergebnisse.

Bash-Tool

Das von Anthropic definierte Bash-Tool, das dieses Beispiel lokal ausführt.