• メッセージ
  • マネージドエージェント
  • 管理
Search...
⌘K
はじめに
Claude入門クイックスタート
Claudeで構築する
機能の概要Messages APIの使用停止理由とフォールバック拒否とフォールバックフォールバッククレジット
モデルの機能
拡張思考適応型思考エフォートタスクバジェット(ベータ版)高速モード(リサーチプレビュー)構造化出力引用メッセージのストリーミングバッチ処理検索結果拒否のストリーミング多言語サポート埋め込み
ツール
概要ツール使用の仕組みチュートリアル:ツール使用エージェントの構築ツールの定義ツール呼び出しの処理並列ツール使用ツールランナー(SDK)厳密なツール使用プロンプトキャッシングを使ったツール使用サーバーツールトラブルシューティングウェブ検索ツールウェブ取得ツールコード実行ツールアドバイザーツールメモリツールBashツールコンピュータ使用ツールテキストエディタツール
ツールインフラストラクチャ
ツールリファレンスツールコンテキストの管理ツールの組み合わせツール検索プログラムによるツール呼び出しきめ細かいツールストリーミング
コンテキスト管理
コンテキストウィンドウコンパクションコンテキスト編集プロンプトキャッシング会話途中のシステムメッセージオーケストレーションモードの構築キャッシュ診断(ベータ版)トークンカウント
ファイルの操作
Files APIPDFサポート画像とビジョン
スキル
概要クイックスタートベストプラクティスエンタープライズ向けスキルAPIでのスキル
MCP
リモートMCPサーバーMCPコネクタ
クラウドプラットフォーム上のClaude
Amazon BedrockAmazon Bedrock(レガシー)AWS上のClaude PlatformMicrosoft FoundryVertex AI
Log in
オーケストレーションモードの構築
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
メッセージ/コンテキスト管理

オーケストレーションモードを構築する

マルチエージェントのファンアウトに対する継続的な同意を付与するセッションレベルのモードを構築します。このモードは会話途中のシステムメッセージでオン・オフを切り替えます。

オーケストレーションモードはセッションレベルのスイッチです。オンのとき、モデルはすべての実質的なリクエストに対して最大限の徹底性を発揮し、タスク自体を偵察してから、デフォルトで並列サブエージェントに作業を「fan-out」(ファンアウト)します。オフのとき、同じオーケストレーションツールはリクエストごとのオプトインに戻ります。

このモードはAPIパラメータではありません。ドキュメント化された要素のみから構築されています。

  1. エフォートレベル: リクエストは、xhigh などのドキュメント化された Effort 値で実行されます。そのページに記載されているもの以上の隠しレベルはありません。
  2. モードリマインダー: 会話途中のシステムメッセージがモデルにモードがアクティブであることを伝え、数ターンごとに1行のリフレッシャーを送り、モードがオフになったときに終了通知を送ります。トップレベルの system フィールドは変更されないため、キャッシュされたプレフィックスはそのまま維持されます。
  3. ツール説明内の継続的な同意: オーケストレーションツールの説明には、モードがオンの間、モデルは事前に確認することなく、すべての実質的なタスクに対してワークフローを作成して実行すべきであると記載されています。

この例では会話途中のシステムメッセージを使用しており、これは現在 Claude Opus 4.8 でのみ利用可能です。ファンアウト自体がトークン使用量を倍増させます。1つのリクエストが多数のサブエージェント会話を生成する可能性があるため、そのコストに見合う作業にのみこのモードを使用してください。

ループをセットアップする

この例は単一ファイルです。定数はエフォートレベル、ファンアウトの形状、モードリフレッシャーを再送信する頻度を制御します。MAX_CONCURRENT は同時に実行されるサブエージェントの数を制限します(PHP版は逐次実行のためこれを無視します)。MAX_TOTAL_SUBTASKS は、モデルが1回のWorkflow呼び出しでキューに入れられるサブタスクの数を制限します。この2つを分けることで、モデルは大きなバックログを一度にすべて起動することなく計画できます。DOC_TEST_MODE チェックは、その環境変数が設定されている場合にループを1ターンに制限します。これにより、自動ドキュメントハーネスがオーケストレーション全体を実行せずにファイルがコンパイルされ迅速に終了することを検証できます。自分で例を実行する場合は未設定のままにしてください。

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"

モードリマインダーを定義する

リマインダーは意図的に短くしています。モードを切り替え、重要な指示が記載されているツール説明を指し示すだけです。モードがオンになったときに全文が1回送信され、リフレッシャーは数回のユーザーターンの後にのみ再送信され、モードがオフになったときに終了通知が1回送信されます。

MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)

ツール説明で継続的な同意を付与する

Workflowツールが実際の動作契約を担います。オプトインルール、モードがオンの間に適用される継続的な同意、ファンアウトのサイズを決めるための粒度ガイダンス、モデルが活用できる品質パターン(検証ウェーブ、完全性クリティック、マルチフェーズシーケンシング)などです。サブエージェントには report_findings ツールも与えられ、結果が散文ではなく構造化されたJSONとして返されます。bashツールは、ローカルで実行されるAnthropic定義の bash_20250124 ツールです。

WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}

bashツールをローカルで実行する

bashハンドラーは、要求されたコマンドをタイムアウト付きで実行し、stdoutとstderrを結合してキャプチャし、暴走したコマンドがコンテキストウィンドウを溢れさせないように結果を切り詰めます。コマンドは例を起動したディレクトリで実行されるため、プロジェクトを対象にする場合はそこから起動してください。DOC_TEST_MODE が設定されている場合、ハーネスは代わりに小さな使い捨てのフィクスチャディレクトリをbashに与え、終了時に削除します。ここにはサンドボックスはありません。コマンドは例を起動したプロセスの権限で実行されます。わかりやすさのため、この例では bash_20250124 契約が記述する永続セッションを維持するのではなく、各呼び出しを新しいサブシェルで実行します。本番エージェントでは、作業ディレクトリ、環境、restart アクションがドキュメントどおりに動作するように、長寿命のシェルでツールをバックアップすべきです。

# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)

1つのサブエージェントを実行する

各ワークフローサブタスクは、bashツールを持つ独自の小さなエージェントループになり、メインループと同じエフォートで実行されます。リクエストごとのタイムアウトが各API呼び出しを制限するため、接続が切断されても実行全体が停止するのではなく、1つのサブエージェントが劣化するだけで済みます。

def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"

結果をジャーナルに記録して再実行時に再開する

数十のサブエージェントを生成するファンアウトは、ゼロから再起動するとコストがかかります。小さなコンテンツアドレス型ジャーナルがこれを冪等にします。サブエージェントをディスパッチする前に、そのプロンプトのSHA-256をローカルJSONファイルで検索し、記録された結果が存在すればそれを返します。実行を中断して再実行すると、完了しなかったサブタスクのみが再計算されます。ジャーナルは実行間で重複排除しますが、単一のファンアウトウェーブ内では行いません。最初からやり直すにはジャーナルファイルを削除してください。

_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result

ファンアウトしてから検証する

ファンアウトは最大 MAX_TOTAL_SUBTASKS 個のプロンプトを受け入れ、最大 MAX_CONCURRENT 個を同時実行しながらジャーナルを通して実行し(PHP版では逐次実行)、失敗を分離して、1つの壊れたサブエージェントが実行を終了させるのではなくエラー文字列に劣化するようにします。最初のウェーブが終了すると、2番目のウェーブが同じサブエージェントパスを再利用して各結果の反証を試みます。すべての検証者がソースから主張を再導出し、不確実な場合はデフォルトで反証済みとします。元の結果とその判定の両方がオーケストレーターに返され、オーケストレーターはそれらを総合的に評価できます。

def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False

会話途中のシステムメッセージでモードを切り替える

エージェントは最初にユーザーのメッセージを追加し、次に送信すべきシステムメッセージ(終了通知、モード開始時の全文、または定期的なリフレッシャー)を追加します。システムメッセージをユーザーターンの後に配置することで、その前にあるキャッシュされたすべてのバイトが変更されずに保たれ、システムメッセージがユーザーターンの後に続くという配置ルールを満たします。

class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"

実行する

この例のbashツールは、モデルが書いたコマンドをサンドボックスなしでマシン上で直接実行し、ファンアウトはそれらのエージェントを複数並列で実行します。公開しても問題ないディレクトリと環境で実行し、ローカルでの実験以外の用途に適応させる前にサンドボックス化を追加してください。

if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))

エージェントに作業させたいディレクトリ(たとえばレビュー対象のリポジトリのルート)から例を起動します。

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

モードがオンの場合、モデルはいくつかのbashコマンドで偵察し、指示なしでWorkflowツールをディスパッチし、サブエージェントのレポートを最終回答に統合することが期待されます。些細なリクエストや会話的なリクエストは、リマインダーの指示どおり単独で処理されます。

本番ハーネスに向けて

この例は意図的に小さくしています。実際のワークロード向けのハーネスには、通常以下を追加します。

  • サンドボックス化されたオーケストレーションスクリプト: サブタスク文字列のフラットなリストのみを受け入れるのではなく、モデルに短いオーケストレーションプログラム(分岐、ループ、リデュースステップ)を出力させ、隔離されたインタープリター内で実行します。
  • 永続的なジャーナリング: ローカルJSONファイルを、プロセス再起動後も存続し、複数マシン間での同時書き込みに対して安全なストアに置き換えます。
  • 予算の強制: Workflow呼び出しごとだけでなく、セッション全体で起動されたサブエージェントの総数を追跡し、暴走した計画がクォータを使い果たさないようにハードキャップを超えることを拒否します。

この例のパターン(モードリマインダー、ツール説明内の継続的な同意、ジャーナリング、検証ウェーブ)はそのまま引き継がれます。より堅牢になるのは、それらを取り巻く実行基盤のみです。

関連情報

会話途中のシステムメッセージ

モードリマインダーが使用するメカニズムと、プロンプトキャッシングとの相互作用について。

Effort

APIが受け入れるエフォートレベルとその選び方。

Claudeでのツール使用

ツールの定義、ツール呼び出しの処理、ツール結果について。

Bashツール

この例がローカルで実行するAnthropic定義のbashツール。

Was this page helpful?

  • ループをセットアップする
  • モードリマインダーを定義する
  • ツール説明で継続的な同意を付与する
  • bashツールをローカルで実行する
  • 1つのサブエージェントを実行する
  • 結果をジャーナルに記録して再実行時に再開する
  • ファンアウトしてから検証する
  • 会話途中のシステムメッセージでモードを切り替える
  • 実行する
  • 本番ハーネスに向けて
  • 関連情報