• 消息
  • 托管智能体
  • 管理
Search...
⌘K
第一步
Claude 简介快速入门
使用 Claude 构建
功能概览使用消息 API停止原因与回退拒绝与回退回退额度
模型能力
扩展思考自适应思考努力程度任务预算(测试版)快速模式(研究预览)结构化输出引用流式传输消息批处理搜索结果流式传输拒绝多语言支持嵌入
工具
概览工具使用的工作原理教程:构建使用工具的智能体定义工具处理工具调用并行工具使用工具运行器(SDK)严格工具使用工具使用与提示缓存服务器工具故障排除网络搜索工具网页获取工具代码执行工具顾问工具记忆工具Bash 工具计算机使用工具文本编辑器工具
工具基础设施
工具参考管理工具上下文工具组合工具搜索编程式工具调用细粒度工具流式传输
上下文管理
上下文窗口压缩上下文编辑提示缓存对话中系统消息构建编排模式缓存诊断(测试版)令牌计数
处理文件
文件 APIPDF 支持图像与视觉
技能
概览快速入门最佳实践企业技能API 中的技能
MCP
远程 MCP 服务器MCP 连接器
云平台上的 Claude
Amazon BedrockAmazon Bedrock(旧版)AWS 上的 Claude PlatformMicrosoft FoundryVertex AI
Log in
构建编排模式
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
消息/上下文管理

构建编排模式

构建一个会话级模式,为多智能体扇出授予常设许可,并通过对话中系统消息进行开启和关闭。

"Orchestration mode"(编排模式)是一个会话级开关:当它开启时,模型会对每个实质性请求投入最大程度的彻底性,先自行侦察任务,然后默认将工作扇出到并行的子智能体。当它关闭时,同一个编排工具会恢复为按请求逐次选择启用。

该模式不是一个 API 参数。它完全由已记录在文档中的组件构建而成:

  1. **一个 effort 级别:**请求以文档中记录的 Effort 值(例如 xhigh)运行。不存在高于该页面所列级别的隐藏级别。
  2. **一个模式提醒:**一条对话中系统消息告知模型该模式处于激活状态,每隔几个回合发送一行简短的刷新提醒,并在模式关闭时发送退出通知。顶层 system 字段始终不变,因此缓存的前缀保持完整。
  3. **工具描述中的常设许可:**编排工具的描述声明,当该模式开启时,模型应为每个实质性任务编写并运行工作流,无需事先询问。

本示例使用对话中系统消息,该功能目前仅在 Claude Opus 4.8 上可用。扇出本身会成倍增加令牌使用量:单个请求可能会衍生出许多子智能体对话,因此请将该模式保留用于值得付出此成本的工作。

设置循环

本示例是一个单文件。常量控制 effort 级别、扇出形态以及模式刷新提醒的重发频率。MAX_CONCURRENT 限制同时运行的子智能体数量(PHP 移植版本是顺序执行的,会忽略此值);MAX_TOTAL_SUBTASKS 限制模型在单次 Workflow 调用中可排队的子任务数量。将两者分开可以让模型规划大量待办任务,而不必一次性全部启动。DOC_TEST_MODE 检查会在设置该环境变量时将循环限制为单个回合,以便自动化文档测试工具可以验证文件能够编译并快速完成,而无需运行完整的编排流程;当您自己运行示例时,请保持该变量未设置。

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"

定义模式提醒

这些提醒刻意保持简短。它们负责切换模式并指向工具描述,而重量级的指令都位于工具描述中。完整文本在模式开启时发送一次,刷新提醒仅在经过几个用户回合后重新发送,退出通知在模式关闭时发送一次。

MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)

在工具描述中授予常设许可

Workflow 工具承载了真正的行为契约:选择启用规则、模式开启期间适用的常设许可、用于确定扇出规模的粒度指导,以及模型可以采用的质量模式(验证波次、完整性评审员、多阶段排序)。子智能体还会获得一个 report_findings 工具,以便它们的结果以结构化 JSON 而非散文形式返回,而 bash 工具是 Anthropic 定义的 bash_20250124 工具,在本地运行。

WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}

在本地运行 bash 工具

bash 处理程序以超时限制运行请求的命令,捕获合并后的 stdout 和 stderr,并截断结果,以防失控的命令淹没上下文窗口。命令在您启动示例的目录中运行,因此若要将其指向某个项目,就意味着要在该项目目录中启动它;当设置了 DOC_TEST_MODE 时,测试工具会改为给 bash 提供一个小型的临时固定目录,该目录会在退出时被删除。这里没有沙箱:命令以启动示例的进程的权限运行。为清晰起见,本示例在每次调用时都在一个新的子 shell 中运行,而不是维护 bash_20250124 契约所描述的持久会话;生产环境的智能体应使用长期运行的 shell 来支持该工具,以便工作目录、环境和 restart 操作按文档所述运行。

# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)

运行单个子智能体

每个工作流子任务都会成为一个独立的小型智能体循环,配备 bash 工具,并以与主循环相同的 effort 级别运行。每个请求的超时限制约束了每次 API 调用,因此连接断开只会降级单个子智能体,而不会使整个运行停滞。

def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"

记录结果以便重新运行时恢复

衍生出数十个子智能体的扇出从头重启成本很高。一个小型的内容寻址日志使其具有幂等性:在分派子智能体之前,在本地 JSON 文件中查找其提示的 SHA-256 哈希值,如果存在记录的结果则直接返回。中断运行后重新运行,只有从未完成的子任务才会被重新计算。该日志在多次运行之间去重,而不是在单个扇出波次内去重;删除日志文件即可重新开始。

_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result

扇出,然后验证

扇出最多接受 MAX_TOTAL_SUBTASKS 个提示,通过日志运行它们,同时最多有 MAX_CONCURRENT 个在执行中(PHP 移植版本为顺序执行),并隔离失败,使单个出错的子智能体降级为一个错误字符串,而不是终止整个运行。第一波完成后,第二波会复用相同的子智能体路径来尝试反驳每个结果:每个验证器都从源头重新推导声明,在不确定时默认判定为已反驳。原始结果及其判定都会返回给编排器,以便它可以综合权衡。

def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False

使用对话中系统消息切换模式

智能体首先追加用户的消息,然后追加任何到期的系统消息:退出通知、进入时的完整模式文本,或定期刷新提醒。将系统消息放在用户回合之后,可以保持其前面的每个缓存字节不受影响,并满足系统消息必须跟在用户回合之后的放置规则。

class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"

运行它

本示例中的 bash 工具会直接在您的机器上运行模型编写的命令,没有任何沙箱,而且扇出会并行运行多个此类智能体。请在您愿意暴露的目录和环境中运行它,并在将其改编用于本地实验以外的任何用途之前添加沙箱机制。

if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))

从您希望智能体工作的目录启动示例,例如要审查的代码仓库的根目录:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

当模式开启时,预期模型会先用几个 bash 命令进行侦察,主动分派 Workflow 工具,并将子智能体的报告综合成最终答案。正如提醒所指示的,琐碎或对话性的请求仍由单个智能体处理。

迈向生产级工具框架

本示例刻意保持精简。面向真实工作负载的工具框架通常会添加:

  • **沙箱化的编排脚本:**让模型输出一个简短的编排程序(包含分支、循环和归约步骤),并在隔离的解释器中运行它,而不是仅接受一个扁平的子任务字符串列表。
  • **持久化日志记录:**用一个能够在进程重启后存续、并且在跨机器并发写入时安全的存储来替换本地 JSON 文件。
  • **预算强制执行:**跟踪整个会话中启动的子智能体总数,而不仅仅是每次 Workflow 调用的数量,并拒绝超过硬性上限,以防失控的计划耗尽您的配额。

本示例中的模式(模式提醒、工具描述中的常设许可、日志记录和验证波次)可以原封不动地沿用;只有围绕它们的执行基础设施需要变得更加健壮。

相关内容

对话中系统消息

模式提醒所使用的机制,以及它如何与提示缓存交互。

Effort

API 接受的 effort 级别以及如何选择。

Claude 的工具使用

定义工具、处理工具调用和工具结果。

Bash 工具

本示例在本地执行的 Anthropic 定义的 bash 工具。

Was this page helpful?

  • 设置循环
  • 定义模式提醒
  • 在工具描述中授予常设许可
  • 在本地运行 bash 工具
  • 运行单个子智能体
  • 记录结果以便重新运行时恢复
  • 扇出,然后验证
  • 使用对话中系统消息切换模式
  • 运行它
  • 迈向生产级工具框架
  • 相关内容