• 訊息
  • 託管代理
  • 管理
Search...
⌘K
第一步
Claude 簡介快速入門
使用 Claude 進行建構
功能概覽使用 Messages API停止原因與備援拒絕與備援備援額度
模型能力
擴展思考自適應思考努力程度任務預算(測試版)快速模式(研究預覽)結構化輸出引用串流訊息批次處理搜尋結果串流拒絕多語言支援嵌入
工具
概覽工具使用的運作方式教學:建構使用工具的代理定義工具處理工具呼叫平行工具使用工具執行器(SDK)嚴格工具使用搭配提示快取的工具使用伺服器工具疑難排解網頁搜尋工具網頁擷取工具程式碼執行工具顧問工具記憶體工具Bash 工具電腦使用工具文字編輯器工具
工具基礎架構
工具參考管理工具上下文工具組合工具搜尋程式化工具呼叫細粒度工具串流
上下文管理
上下文視窗壓縮上下文編輯提示快取對話中系統訊息建構協調模式快取診斷(測試版)Token 計數
處理檔案
Files APIPDF 支援圖片與視覺
技能
概覽快速入門最佳實務企業技能API 中的技能
MCP
遠端 MCP 伺服器MCP 連接器
雲端平台上的 Claude
Amazon BedrockAmazon Bedrock(舊版)AWS 上的 Claude PlatformMicrosoft FoundryVertex AI
Log in
建構協調模式
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
訊息/上下文管理

建立協調模式

建立一個工作階段層級的模式,授予多代理扇出的常設同意權,並透過對話中系統訊息來開啟和關閉。

Was this page helpful?

  • 設定迴圈
  • 定義模式提醒
  • 在工具描述中授予常設同意
  • 在本機執行 bash 工具
  • 執行單一子代理
  • 記錄結果以便重新執行時續接
  • 扇出,然後驗證
  • 使用對話中系統訊息切換模式
  • 執行
  • 邁向正式環境的執行框架
  • 相關資源

「Orchestration mode」(協調模式)是一個工作階段層級的開關:當它開啟時,模型會對每個實質性請求投入最大程度的徹底性,先自行偵察任務,然後預設將工作扇出給平行的子代理。當它關閉時,同一個協調工具會回到逐請求選擇加入的方式。

此模式並非 API 參數。它完全由已記載的元件組成:

  1. 努力等級: 請求以已記載的 Effort(努力)值執行,例如 xhigh。該頁面所列等級之上沒有任何隱藏等級。
  2. 模式提醒: 一則對話中系統訊息告知模型該模式已啟用,每隔數個回合會有一行簡短的複習提醒,並在模式關閉時發送退出通知。頂層的 system 欄位從不變更,因此快取的前綴保持完整。
  3. 工具描述中的常設同意: 協調工具的描述聲明,當模式開啟時,模型應為每個實質性任務編寫並執行工作流程,無需事先詢問。

此範例使用對話中系統訊息,目前僅在 Claude Opus 4.8 上可用。扇出本身會倍增 token 使用量:單一請求可能衍生許多子代理對話,因此請將此模式保留給值得付出該成本的工作。

設定迴圈

此範例是單一檔案。常數控制努力等級、扇出形態,以及模式複習提醒重新發送的頻率。MAX_CONCURRENT 限制同時執行的子代理數量(PHP 移植版本為循序執行,會忽略此設定);MAX_TOTAL_SUBTASKS 限制模型在單次 Workflow 呼叫中可排入佇列的數量。將兩者分開可讓模型規劃大量待辦項目,而不必一次全部啟動。DOC_TEST_MODE 檢查會在設定該環境變數時將迴圈限制為單一回合,以便自動化文件測試工具能驗證檔案可編譯並快速完成,而無需執行完整的協調流程;自行執行範例時請勿設定此變數。

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"

定義模式提醒

這些提醒刻意保持簡短。它們切換模式並指向工具描述,重量級的指示都放在那裡。完整文字在模式開啟時發送一次,複習提醒僅在數個使用者回合後重新發送,退出通知則在模式關閉時發送一次。

在工具描述中授予常設同意

Workflow 工具承載真正的行為契約:選擇加入規則、模式開啟期間適用的常設同意、調整扇出規模的粒度指引,以及模型可採用的品質模式(驗證波次、完整性評審、多階段排序)。子代理也會獲得一個 report_findings 工具,使其結果以結構化 JSON 而非散文形式回傳,而 bash 工具則是在本機執行的 Anthropic 定義的 bash_20250124 工具。

在本機執行 bash 工具

bash 處理器會以逾時限制執行所請求的命令,擷取合併的 stdout 和 stderr,並截斷結果,以免失控的命令淹沒上下文視窗。命令會在您啟動範例的目錄中執行,因此若要將其指向某個專案,就表示要從該處啟動;當設定了 DOC_TEST_MODE 時,測試工具會改為提供一個小型的臨時測試目錄給 bash,並在結束時移除。此處沒有沙箱:命令會以啟動範例之程序的權限執行。為求清晰,此範例在全新的子 shell 中執行每次呼叫,而非維護 bash_20250124 契約所描述的持久工作階段;正式環境的代理應以長期執行的 shell 支援此工具,使工作目錄、環境和 restart 動作的行為符合文件所述。

執行單一子代理

每個工作流程子任務都會成為自己的小型代理迴圈,配備 bash 工具,並以與主迴圈相同的努力等級執行。每個請求的逾時限制會約束每次 API 呼叫,因此連線中斷只會降級單一子代理,而不會使整個執行停滯。

記錄結果以便重新執行時續接

衍生數十個子代理的扇出若要從頭重新啟動,成本相當高昂。一個小型的內容定址日誌可使其具有冪等性:在派遣子代理之前,先在本機 JSON 檔案中查詢其提示的 SHA-256,若已有記錄的結果則直接回傳。中斷執行後重新執行,只有從未完成的子任務會被重新計算。此日誌會跨執行去重,而非在單一扇出波次內去重;刪除日誌檔案即可重新開始。

扇出,然後驗證

扇出最多接受 MAX_TOTAL_SUBTASKS 個提示,透過日誌執行它們,同時進行中的數量最多為 MAX_CONCURRENT(PHP 移植版本為循序執行),並隔離失敗,使單一故障的子代理降級為錯誤字串,而非結束整個執行。第一波完成後,第二波會重複使用相同的子代理路徑,嘗試反駁每個結果:每個驗證器都會從來源重新推導主張,不確定時預設為已反駁。原始結果及其判定都會回傳給協調器,以便一併權衡。

使用對話中系統訊息切換模式

代理會先附加使用者的訊息,然後附加任何到期的系統訊息:退出通知、進入時的完整模式文字,或定期的複習提醒。將系統訊息放在使用者回合之後,可使其前方的每個快取位元組保持不變,並滿足系統訊息須跟隨使用者回合的放置規則。

執行

此範例中的 bash 工具會直接在您的機器上執行模型編寫的命令,沒有任何沙箱,且扇出會平行執行數個此類代理。請在您願意暴露的目錄和環境中執行,並在將其改編用於本機實驗以外的任何用途之前,先加入沙箱機制。

從您希望代理工作的目錄啟動範例,例如要審查的儲存庫根目錄:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

模式開啟時,預期模型會先用幾個 bash 命令進行偵察,主動派遣 Workflow 工具,並將子代理報告綜合成最終答案。瑣碎或對話性的請求則保持單獨處理,如提醒所指示。

邁向正式環境的執行框架

此範例刻意保持精簡。針對實際工作負載的執行框架通常會加入:

  • 沙箱化的協調指令碼: 讓模型產出簡短的協調程式(分支、迴圈和歸約步驟),並在隔離的直譯器內執行,而非僅接受子任務字串的扁平清單。
  • 持久化日誌: 將本機 JSON 檔案替換為能在程序重新啟動後存續、且在跨機器並行寫入下仍安全的儲存機制。
  • 預算強制執行: 追蹤整個工作階段中啟動的子代理總數,而非僅限於每次 Workflow 呼叫,並拒絕超過硬性上限,以免失控的計畫耗盡您的配額。

此範例中的模式(模式提醒、工具描述中的常設同意、日誌記錄,以及驗證波次)可原封不動地沿用;只有圍繞它們的執行基礎架構會變得更加穩健。

相關資源

對話中系統訊息

模式提醒所使用的機制,以及它如何與提示快取互動。

Effort

API 接受的努力等級以及如何選擇。

MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Claude 的工具使用

定義工具、處理工具呼叫和工具結果。

Bash 工具

此範例在本機執行的 Anthropic 定義的 bash 工具。