• Сообщения
  • Управляемые агенты
  • Администрирование
Search...
⌘K
Первые шаги
Знакомство с ClaudeБыстрый старт
Разработка с Claude
Обзор возможностейИспользование Messages APIПричины остановки и резервный вариантОтказы и резервный вариантРезервный кредит
Возможности модели
Расширенное мышлениеАдаптивное мышлениеУсилиеБюджеты задач (бета)Быстрый режим (исследовательская предварительная версия)Структурированные выходные данныеЦитированиеПотоковая передача сообщенийПакетная обработкаРезультаты поискаПотоковая передача отказовМногоязычная поддержкаЭмбеддинги
Инструменты
ОбзорКак работает использование инструментовРуководство: создание агента с использованием инструментовОпределение инструментовОбработка вызовов инструментовПараллельное использование инструментовTool Runner (SDK)Строгое использование инструментовИспользование инструментов с кэшированием подсказокСерверные инструментыУстранение неполадокИнструмент веб-поискаИнструмент загрузки веб-страницИнструмент выполнения кодаИнструмент советникаИнструмент памятиИнструмент BashИнструмент использования компьютераИнструмент текстового редактора
Инфраструктура инструментов
Справочник по инструментамУправление контекстом инструментовКомбинации инструментовПоиск инструментовПрограммный вызов инструментовДетальная потоковая передача инструментов
Управление контекстом
Контекстные окнаСжатиеРедактирование контекстаКэширование подсказокСистемные сообщения в середине разговораСоздание режима оркестрацииДиагностика кэша (бета)Подсчёт токенов
Работа с файлами
Files APIПоддержка PDFИзображения и компьютерное зрение
Навыки
ОбзорБыстрый стартРекомендацииНавыки для предприятийНавыки в API
MCP
Удалённые серверы MCPКоннектор MCP
Claude на облачных платформах
Amazon BedrockAmazon Bedrock (устаревшая версия)Claude Platform на AWSMicrosoft FoundryVertex AI
Log in
Создание режима оркестрации
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
Сообщения/Управление контекстом

Создание режима оркестрации

Создайте режим уровня сессии, который предоставляет постоянное согласие на распараллеливание работы между несколькими агентами и включается и выключается с помощью системных сообщений в середине разговора.

Was this page helpful?

  • Настройка цикла
  • Определение напоминаний о режиме
  • Предоставление постоянного согласия в описании инструмента
  • Локальный запуск инструмента bash
  • Запуск одного субагента
  • Журналирование результатов для возобновления повторных запусков
  • Распараллеливание, затем верификация
  • Переключение режима с помощью системных сообщений в середине разговора
  • Запуск
  • На пути к производственной обвязке
  • Связанные материалы

Режим оркестрации — это переключатель уровня сессии: когда он включён, модель прикладывает максимальную тщательность к каждому содержательному запросу, сначала самостоятельно исследуя задачу, а затем по умолчанию распределяя работу между параллельными субагентами. Когда он выключен, тот же инструмент оркестрации возвращается к режиму явного согласия для каждого отдельного запроса.

Этот режим не является параметром API. Он полностью собран из задокументированных компонентов:

  1. Уровень усилий: запросы выполняются с задокументированным значением Effort, например xhigh. Никакого скрытого уровня выше тех, что указаны на той странице, не существует.
  2. Напоминание о режиме: системное сообщение в середине разговора сообщает модели, что режим активен, с однострочным напоминанием каждые несколько ходов и уведомлением о выходе при выключении режима. Поле system верхнего уровня никогда не меняется, поэтому кэшированный префикс остаётся нетронутым.
  3. Постоянное согласие в описании инструмента: в описании инструмента оркестрации указано, что пока режим включён, модель должна составлять и запускать рабочий процесс для каждой содержательной задачи, не спрашивая предварительного разрешения.

В этом примере используются системные сообщения в середине разговора, которые в настоящее время доступны только в Claude Opus 4.8. Само распараллеливание многократно увеличивает расход токенов: один запрос может породить множество разговоров с субагентами, поэтому приберегите этот режим для работы, которая оправдывает такие затраты.

Настройка цикла

Пример представляет собой один файл. Константы управляют уровнем усилий, формой распараллеливания и частотой повторной отправки напоминания о режиме. MAX_CONCURRENT ограничивает количество одновременно работающих субагентов (порт на PHP выполняется последовательно и игнорирует эту константу); MAX_TOTAL_SUBTASKS ограничивает количество подзадач, которые модель может поставить в очередь за один вызов Workflow. Разделение этих двух параметров позволяет модели планировать большой объём работы, не запуская всё сразу. Проверка DOC_TEST_MODE ограничивает циклы одним ходом, когда установлена эта переменная окружения, чтобы автоматизированная тестовая обвязка документации могла проверить, что файл компилируется и быстро завершается, не запуская полную оркестрацию; оставьте её неустановленной, когда запускаете пример самостоятельно.

Определение напоминаний о режиме

Напоминания намеренно короткие. Они переключают режим и указывают на описание инструмента, где находятся основные инструкции. Полный текст отправляется один раз при включении режима, напоминание повторно отправляется только после нескольких ходов пользователя, а уведомление о выходе отправляется один раз при выключении режима.

Предоставление постоянного согласия в описании инструмента

Инструмент Workflow содержит настоящий поведенческий контракт: правило явного согласия, постоянное согласие, действующее при включённом режиме, рекомендации по гранулярности для определения размера распараллеливания и шаблоны качества, к которым модель может прибегнуть (волна верификации, критик полноты, многофазная последовательность). Субагенты также получают инструмент report_findings, чтобы их результаты возвращались в виде структурированного JSON, а не прозы, а инструмент bash — это определённый Anthropic инструмент bash_20250124, запускаемый локально.

Локальный запуск инструмента bash

Обработчик bash запускает запрошенную команду с тайм-аутом, захватывает объединённые stdout и stderr и усекает результат, чтобы вышедшая из-под контроля команда не переполнила контекстное окно. Команды выполняются в каталоге, из которого вы запускаете пример, поэтому, чтобы направить его на проект, нужно запустить его оттуда; когда установлена переменная DOC_TEST_MODE, тестовая обвязка вместо этого предоставляет bash небольшой временный каталог-фикстуру, который удаляется при выходе. Здесь нет песочницы: команда выполняется с правами процесса, запустившего пример. Для наглядности этот пример запускает каждый вызов в новой подоболочке, а не поддерживает постоянную сессию, описанную в контракте bash_20250124; производственный агент должен обеспечивать инструмент долгоживущей оболочкой, чтобы рабочий каталог, окружение и действие restart вели себя так, как задокументировано.

Запуск одного субагента

Каждая подзадача рабочего процесса становится собственным небольшим агентным циклом с инструментом bash, работающим на том же уровне усилий, что и основной цикл. Тайм-аут для каждого запроса ограничивает каждый вызов API, так что разорванное соединение ухудшает работу одного субагента, а не останавливает весь запуск.

Журналирование результатов для возобновления повторных запусков

Распараллеливание, порождающее десятки субагентов, дорого перезапускать с нуля. Небольшой журнал с адресацией по содержимому делает его идемпотентным: перед отправкой субагента найдите SHA-256 его подсказки в локальном JSON-файле и верните записанный результат, если он существует. Прервите запуск, запустите его снова — и пересчитаны будут только те подзадачи, которые так и не завершились. Журнал устраняет дубликаты между запусками, а не внутри одной волны распараллеливания; удалите файл журнала, чтобы начать с чистого листа.

Распараллеливание, затем верификация

Распараллеливание принимает до MAX_TOTAL_SUBTASKS подсказок, прогоняет их через журнал с не более чем MAX_CONCURRENT одновременно выполняющимися (последовательно в порте на PHP) и изолирует сбои, так что один сломанный субагент деградирует до строки с ошибкой, а не завершает весь запуск. После завершения первой волны вторая волна повторно использует тот же путь субагента, чтобы попытаться опровергнуть каждый результат: каждый верификатор заново выводит утверждения из источника, по умолчанию считая их опровергнутыми при неопределённости. И исходный результат, и вердикт по нему возвращаются оркестратору, чтобы он мог взвесить их вместе.

Переключение режима с помощью системных сообщений в середине разговора

Агент сначала добавляет сообщение пользователя, затем любые системные сообщения, которые должны быть отправлены: уведомление о выходе, полный текст режима при входе или периодическое напоминание. Размещение системного сообщения после хода пользователя оставляет нетронутым каждый кэшированный байт перед ним и удовлетворяет правилу размещения, согласно которому системное сообщение следует за ходом пользователя.

Запуск

Инструмент bash в этом примере запускает написанные моделью команды непосредственно на вашей машине без песочницы, а распараллеливание запускает несколько таких агентов одновременно. Запускайте его в каталоге и окружении, которые вам не жалко раскрыть, и добавьте изоляцию в песочнице, прежде чем адаптировать его для чего-либо, кроме локальных экспериментов.

Запустите пример из каталога, в котором вы хотите, чтобы агенты работали, например из корня репозитория для проверки:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

При включённом режиме ожидайте, что модель проведёт разведку с помощью нескольких команд bash, без запроса вызовет инструмент Workflow и синтезирует отчёты субагентов в итоговый ответ. Тривиальные или разговорные запросы обрабатываются в одиночку, как предписывает напоминание.

На пути к производственной обвязке

Этот пример намеренно небольшой. Обвязка, предназначенная для реальных рабочих нагрузок, обычно включала бы дополнительно:

  • Скрипты оркестрации в песочнице: позвольте модели выдавать короткую программу оркестрации (ветвление, циклы и шаги свёртки) и запускайте её внутри изолированного интерпретатора, вместо того чтобы принимать только плоский список строк подзадач.
  • Надёжное журналирование: замените локальный JSON-файл хранилищем, которое переживает перезапуски процесса и безопасно при одновременной записи с нескольких машин.
  • Контроль бюджета: отслеживайте общее количество запущенных субагентов за всю сессию, а не только за один вызов Workflow, и отказывайтесь превышать жёсткий лимит, чтобы вышедший из-под контроля план не мог исчерпать вашу квоту.

Шаблоны из этого примера (напоминания о режиме, постоянное согласие в описании инструмента, журналирование и волна верификации) переносятся без изменений; более надёжным становится только исполнительный субстрат вокруг них.

Связанные материалы

Системные сообщения в середине разговора

Механизм, который используют напоминания о режиме, и его взаимодействие с кэшированием подсказок.

Effort

Уровни усилий, которые принимает API, и как выбрать подходящий.

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"
MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Использование инструментов с Claude

Определение инструментов, обработка вызовов инструментов и результатов инструментов.

Инструмент bash

Определённый Anthropic инструмент bash, который этот пример выполняет локально.