• Pesan
  • Managed Agents
  • Admin
Search...
⌘K
Langkah pertama
Pengenalan ClaudeMulai cepat
Membangun dengan Claude
Ikhtisar fiturMenggunakan Messages APIAlasan berhenti dan fallbackPenolakan dan fallbackKredit fallback
Kemampuan model
Pemikiran diperpanjangPemikiran adaptifUpayaAnggaran tugas (beta)Mode cepat (pratinjau riset)Output terstrukturSitasiStreaming PesanPemrosesan batchHasil pencarianStreaming penolakanDukungan multibahasaEmbeddings
Alat
IkhtisarCara kerja penggunaan alatTutorial: Membangun agen yang menggunakan alatMendefinisikan alatMenangani panggilan alatPenggunaan alat paralelTool Runner (SDK)Penggunaan alat ketatPenggunaan alat dengan caching promptAlat serverPemecahan masalahAlat pencarian webAlat pengambilan webAlat eksekusi kodeAlat penasihatAlat memoriAlat BashAlat penggunaan komputerAlat editor teks
Infrastruktur alat
Referensi alatMengelola konteks alatKombinasi alatPencarian alatPemanggilan alat terprogramStreaming alat terperinci
Manajemen konteks
Jendela konteksPemadatanPengeditan konteksCaching promptPesan sistem di tengah percakapanMembangun mode orkestrasiDiagnostik cache (beta)Penghitungan token
Bekerja dengan file
Files APIDukungan PDFGambar dan visi
Skills
IkhtisarMulai cepatPraktik terbaikSkills untuk enterpriseSkills di API
MCP
Server MCP jarak jauhKonektor MCP
Claude di platform cloud
Amazon BedrockAmazon Bedrock (lama)Claude Platform di AWSMicrosoft FoundryVertex AI
Log in
Membangun mode orkestrasi
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
Pesan/Manajemen konteks

Membangun mode orkestrasi

Bangun mode tingkat sesi yang memberikan persetujuan tetap untuk fan-out multi-agen, yang diaktifkan dan dinonaktifkan dengan pesan sistem di tengah percakapan.

Was this page helpful?

  • Menyiapkan loop
  • Mendefinisikan pengingat mode
  • Memberikan persetujuan tetap dalam deskripsi alat
  • Menjalankan alat bash secara lokal
  • Menjalankan satu subagen
  • Mencatat hasil dalam jurnal agar pengulangan dapat dilanjutkan
  • Fan out, lalu verifikasi
  • Mengalihkan mode dengan pesan sistem di tengah percakapan
  • Menjalankannya
  • Menuju harness produksi
  • Terkait

Mode orkestrasi adalah sakelar tingkat sesi: ketika aktif, model menerapkan ketelitian maksimum pada setiap permintaan substantif, menjelajahi tugas itu sendiri lalu mendistribusikan pekerjaan ke subagen paralel secara default. Ketika nonaktif, alat orkestrasi yang sama kembali ke opt-in per permintaan.

Mode ini bukan parameter API. Mode ini dibangun sepenuhnya dari bagian-bagian yang terdokumentasi:

  1. Tingkat effort: permintaan dijalankan pada nilai Effort yang terdokumentasi seperti xhigh. Tidak ada tingkat tersembunyi di atas yang tercantum di halaman tersebut.
  2. Pengingat mode: sebuah pesan sistem di tengah percakapan memberi tahu model bahwa mode sedang aktif, dengan penyegar satu baris setiap beberapa giliran dan pemberitahuan keluar ketika mode dimatikan. Field system tingkat atas tidak pernah berubah, sehingga prefiks yang di-cache tetap utuh.
  3. Persetujuan tetap dalam deskripsi alat: deskripsi alat orkestrasi menyatakan bahwa selama mode aktif, model harus menyusun dan menjalankan alur kerja untuk setiap tugas substantif tanpa bertanya terlebih dahulu.

Contoh ini menggunakan pesan sistem di tengah percakapan, yang saat ini hanya tersedia di Claude Opus 4.8. Fan-out itu sendiri melipatgandakan penggunaan token: satu permintaan dapat memunculkan banyak percakapan subagen, jadi gunakan mode ini hanya untuk pekerjaan yang sepadan dengan biayanya.

Menyiapkan loop

Contoh ini adalah satu file. Konstanta-konstanta mengontrol tingkat effort, bentuk fan-out, dan seberapa sering penyegar mode dikirim ulang. MAX_CONCURRENT membatasi berapa banyak subagen yang berjalan pada saat yang sama (port PHP bersifat sekuensial dan mengabaikannya); MAX_TOTAL_SUBTASKS membatasi berapa banyak yang boleh diantrekan model dalam satu panggilan Workflow. Memisahkan keduanya memungkinkan model merencanakan backlog besar tanpa meluncurkan semuanya sekaligus. Pemeriksaan DOC_TEST_MODE membatasi loop menjadi satu giliran ketika variabel lingkungan tersebut disetel, sehingga harness dokumentasi otomatis dapat memvalidasi bahwa file dapat dikompilasi dan selesai dengan cepat tanpa menjalankan orkestrasi penuh; biarkan tidak disetel saat Anda menjalankan contoh ini sendiri.

Mendefinisikan pengingat mode

Pengingat sengaja dibuat singkat. Pengingat ini mengalihkan mode dan menunjuk ke deskripsi alat, tempat instruksi yang lebih berat berada. Teks lengkap dikirim sekali ketika mode diaktifkan, penyegar dikirim ulang hanya setelah beberapa giliran pengguna, dan pemberitahuan keluar dikirim sekali ketika mode dimatikan.

Memberikan persetujuan tetap dalam deskripsi alat

Alat Workflow membawa kontrak perilaku yang sebenarnya: aturan opt-in, persetujuan tetap yang berlaku selama mode aktif, panduan granularitas untuk menentukan ukuran fan-out, dan pola kualitas yang dapat digunakan model (gelombang verifikasi, kritikus kelengkapan, pengurutan multi-fase). Subagen juga mendapatkan alat report_findings sehingga hasilnya kembali sebagai JSON terstruktur alih-alih prosa, dan alat bash adalah alat bash_20250124 yang didefinisikan Anthropic dan dijalankan secara lokal.

Menjalankan alat bash secara lokal

Handler bash menjalankan perintah yang diminta dengan batas waktu, menangkap gabungan stdout dan stderr, dan memotong hasilnya sehingga perintah yang tidak terkendali tidak dapat membanjiri jendela konteks. Perintah dijalankan di direktori tempat Anda meluncurkan contoh ini, jadi mengarahkannya ke sebuah proyek berarti memulainya dari sana; ketika DOC_TEST_MODE disetel, harness justru memberikan bash direktori fixture sementara kecil yang dihapus saat keluar. Tidak ada sandbox di sini: perintah dijalankan dengan izin proses yang meluncurkan contoh ini. Untuk kejelasan, contoh ini menjalankan setiap panggilan dalam subshell baru alih-alih mempertahankan sesi persisten seperti yang dijelaskan kontrak bash_20250124; agen produksi sebaiknya mendukung alat ini dengan shell yang berumur panjang sehingga direktori kerja, lingkungan, dan aksi restart berperilaku sesuai dokumentasi.

Menjalankan satu subagen

Setiap subtugas workflow menjadi loop agen kecilnya sendiri dengan alat bash, berjalan pada tingkat effort yang sama dengan loop utama. Batas waktu per permintaan membatasi setiap panggilan API sehingga koneksi yang terputus hanya menurunkan satu subagen alih-alih menghentikan seluruh proses.

Mencatat hasil dalam jurnal agar pengulangan dapat dilanjutkan

Fan-out yang memunculkan puluhan subagen mahal untuk dimulai ulang dari awal. Jurnal kecil beralamat konten membuatnya idempoten: sebelum mengirimkan subagen, cari SHA-256 dari prompt-nya dalam file JSON lokal, dan kembalikan hasil yang tercatat jika ada. Hentikan proses, jalankan ulang, dan hanya subtugas yang belum pernah selesai yang dihitung ulang. Jurnal melakukan deduplikasi antar proses, bukan dalam satu gelombang fan-out; hapus file jurnal untuk memulai dari awal.

Fan out, lalu verifikasi

Fan-out menerima hingga MAX_TOTAL_SUBTASKS prompt, menjalankannya melalui jurnal dengan paling banyak MAX_CONCURRENT yang berjalan bersamaan (sekuensial pada port PHP), dan mengisolasi kegagalan sehingga satu subagen yang rusak terdegradasi menjadi string error alih-alih mengakhiri proses. Setelah gelombang pertama selesai, gelombang kedua menggunakan kembali jalur subagen yang sama untuk mencoba menyangkal setiap hasil: setiap verifikator menurunkan ulang klaim dari sumbernya, dengan default ke refuted (tersangkal) ketika tidak pasti. Baik hasil asli maupun putusannya dikembalikan ke orkestrator sehingga dapat menimbang keduanya bersama-sama.

Mengalihkan mode dengan pesan sistem di tengah percakapan

Agen menambahkan pesan pengguna terlebih dahulu, lalu pesan sistem apa pun yang sudah waktunya: pemberitahuan keluar, teks mode lengkap saat masuk, atau penyegar berkala. Menempatkan pesan sistem setelah giliran pengguna menjaga setiap byte yang di-cache sebelumnya tetap tidak tersentuh, dan memenuhi aturan penempatan bahwa pesan sistem mengikuti giliran pengguna.

Menjalankannya

Alat bash dalam contoh ini menjalankan perintah yang ditulis model langsung di mesin Anda tanpa sandbox, dan fan-out menjalankan beberapa agen tersebut secara paralel. Jalankan di direktori dan lingkungan yang Anda rasa nyaman untuk diekspos, dan tambahkan sandboxing sebelum mengadaptasinya untuk apa pun di luar eksperimen lokal.

Mulai contoh ini dari direktori tempat Anda ingin agen bekerja, misalnya root dari repositori yang akan ditinjau:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

Dengan mode aktif, harapkan model untuk menjelajah dengan beberapa perintah bash, mengirimkan alat Workflow tanpa diminta, dan mensintesis laporan subagen menjadi jawaban akhir. Permintaan sepele atau percakapan biasa tetap dikerjakan sendiri, sesuai instruksi pengingat.

Menuju harness produksi

Contoh ini sengaja dibuat kecil. Harness yang ditujukan untuk beban kerja nyata biasanya akan menambahkan:

  • Skrip orkestrasi dalam sandbox: biarkan model menghasilkan program orkestrasi singkat (percabangan, loop, dan langkah reduce) dan jalankan di dalam interpreter yang terisolasi, alih-alih hanya menerima daftar datar string subtugas.
  • Jurnal yang tahan lama: ganti file JSON lokal dengan penyimpanan yang bertahan setelah proses dimulai ulang dan aman terhadap penulis konkuren di berbagai mesin.
  • Penegakan anggaran: lacak total subagen yang diluncurkan di seluruh sesi, bukan hanya per panggilan Workflow, dan tolak untuk melebihi batas keras sehingga rencana yang tidak terkendali tidak dapat menghabiskan kuota Anda.

Pola-pola dalam contoh ini (pengingat mode, persetujuan tetap dalam deskripsi alat, jurnal, dan gelombang verifikasi) dapat diterapkan tanpa perubahan; hanya substrat eksekusi di sekitarnya yang menjadi lebih tangguh.

Terkait

Pesan sistem di tengah percakapan

Mekanisme yang digunakan pengingat mode, dan bagaimana interaksinya dengan caching prompt.

Effort

Tingkat effort yang diterima API dan cara memilihnya.

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"
MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Penggunaan alat dengan Claude

Mendefinisikan alat, menangani panggilan alat, dan hasil alat.

Alat bash

Alat bash yang didefinisikan Anthropic yang dijalankan secara lokal oleh contoh ini.