• Messages
  • Managed Agents
  • 관리자
Search...
⌘K
첫 단계
Claude 소개빠른 시작
Claude로 빌드하기
기능 개요Messages API 사용하기중지 사유 및 폴백거부 및 폴백폴백 크레딧
모델 기능
확장 사고적응형 사고Effort작업 예산 (베타)빠른 모드 (리서치 프리뷰)구조화된 출력인용스트리밍 Messages배치 처리검색 결과스트리밍 거부다국어 지원임베딩
도구
개요도구 사용 작동 방식튜토리얼: 도구 사용 에이전트 빌드하기도구 정의도구 호출 처리병렬 도구 사용Tool Runner (SDK)엄격한 도구 사용프롬프트 캐싱과 함께 도구 사용서버 도구문제 해결웹 검색 도구웹 가져오기 도구코드 실행 도구Advisor 도구메모리 도구Bash 도구컴퓨터 사용 도구텍스트 편집기 도구
도구 인프라
도구 레퍼런스도구 컨텍스트 관리도구 조합도구 검색프로그래밍 방식 도구 호출세분화된 도구 스트리밍
컨텍스트 관리
컨텍스트 윈도우압축컨텍스트 편집프롬프트 캐싱대화 중 시스템 메시지오케스트레이션 모드 빌드하기캐시 진단 (베타)토큰 계산
파일 작업
Files APIPDF 지원이미지 및 비전
스킬
개요빠른 시작모범 사례엔터프라이즈용 스킬API에서의 스킬
MCP
원격 MCP 서버MCP 커넥터
클라우드 플랫폼의 Claude
Amazon BedrockAmazon Bedrock (레거시)AWS의 Claude PlatformMicrosoft FoundryVertex AI
Log in
오케스트레이션 모드 빌드하기
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
Messages/컨텍스트 관리

오케스트레이션 모드 구축하기

대화 중 시스템 메시지로 켜고 끌 수 있으며, 멀티 에이전트 팬아웃에 대한 상시 동의를 부여하는 세션 수준 모드를 구축합니다.

Was this page helpful?

  • 루프 설정하기
  • 모드 리마인더 정의하기
  • 도구 설명에서 상시 동의 부여하기
  • bash 도구를 로컬에서 실행하기
  • 서브에이전트 하나 실행하기
  • 재실행 시 재개할 수 있도록 결과 저널링하기
  • 팬아웃 후 검증하기
  • 대화 중 시스템 메시지로 모드 전환하기
  • 실행하기
  • 프로덕션 하네스를 향하여
  • 관련 문서

"Orchestration mode"(오케스트레이션 모드)는 세션 수준의 스위치입니다. 이 모드가 켜져 있으면 모델은 모든 실질적인 요청에 최대한의 철저함을 적용하여, 작업 자체를 먼저 탐색한 다음 기본적으로 병렬 서브에이전트에게 작업을 분산(팬아웃)합니다. 모드가 꺼져 있으면 동일한 오케스트레이션 도구가 요청별 옵트인 방식으로 돌아갑니다.

이 모드는 API 매개변수가 아닙니다. 문서화된 구성 요소들만으로 완전히 구축됩니다:

  1. Effort 레벨: 요청은 xhigh와 같은 문서화된 Effort 값으로 실행됩니다. 해당 페이지에 나온 레벨 이상의 숨겨진 레벨은 없습니다.
  2. 모드 리마인더: 대화 중 시스템 메시지가 모델에게 모드가 활성화되어 있음을 알리며, 몇 턴마다 한 줄짜리 리프레셔를 보내고 모드가 꺼질 때 종료 알림을 보냅니다. 최상위 system 필드는 절대 변경되지 않으므로 캐시된 프리픽스가 그대로 유지됩니다.
  3. 도구 설명 내 상시 동의: 오케스트레이션 도구의 설명에는 모드가 켜져 있는 동안 모델이 모든 실질적인 작업에 대해 먼저 묻지 않고 워크플로를 작성하고 실행해야 한다고 명시되어 있습니다.

이 예제는 현재 Claude Opus 4.8에서만 사용할 수 있는 대화 중 시스템 메시지를 사용합니다. 팬아웃 자체가 토큰 사용량을 배가시킵니다. 단일 요청이 여러 서브에이전트 대화를 생성할 수 있으므로, 비용을 정당화할 수 있는 작업에만 이 모드를 사용하세요.

루프 설정하기

이 예제는 단일 파일입니다. 상수들은 effort 레벨, 팬아웃 형태, 그리고 모드 리프레셔가 재전송되는 빈도를 제어합니다. MAX_CONCURRENT는 동시에 실행되는 서브에이전트 수를 제한합니다(PHP 포트는 순차적이므로 이를 무시합니다). MAX_TOTAL_SUBTASKS는 모델이 단일 Workflow 호출에서 대기열에 넣을 수 있는 서브태스크 수를 제한합니다. 이 둘을 분리하면 모델이 한 번에 모두 실행하지 않고도 대규모 백로그를 계획할 수 있습니다. DOC_TEST_MODE 검사는 해당 환경 변수가 설정된 경우 루프를 단일 턴으로 제한하여, 자동화된 문서 테스트 하네스가 전체 오케스트레이션을 실행하지 않고도 파일이 컴파일되고 빠르게 완료되는지 검증할 수 있게 합니다. 예제를 직접 실행할 때는 이 변수를 설정하지 마세요.

모드 리마인더 정의하기

리마인더는 의도적으로 짧습니다. 모드를 전환하고, 무거운 지침이 있는 도구 설명을 가리킵니다. 전체 텍스트는 모드가 켜질 때 한 번 전송되고, 리프레셔는 여러 사용자 턴이 지난 후에만 재전송되며, 종료 알림은 모드가 꺼질 때 한 번 전송됩니다.

도구 설명에서 상시 동의 부여하기

Workflow 도구는 실제 행동 계약을 담고 있습니다. 옵트인 규칙, 모드가 켜져 있는 동안 적용되는 상시 동의, 팬아웃 크기를 조정하기 위한 세분화 지침, 그리고 모델이 활용할 수 있는 품질 패턴(검증 웨이브, 완전성 비평가, 다단계 시퀀싱)이 포함됩니다. 서브에이전트는 또한 report_findings 도구를 받아 결과가 산문이 아닌 구조화된 JSON으로 반환되도록 하며, bash 도구는 로컬에서 실행되는 Anthropic 정의 bash_20250124 도구입니다.

bash 도구를 로컬에서 실행하기

bash 핸들러는 요청된 명령을 타임아웃과 함께 실행하고, stdout과 stderr를 결합하여 캡처하며, 폭주하는 명령이 컨텍스트 윈도우를 넘치게 하지 않도록 결과를 잘라냅니다. 명령은 예제를 실행한 디렉터리에서 실행되므로, 특정 프로젝트를 대상으로 하려면 해당 위치에서 시작해야 합니다. DOC_TEST_MODE가 설정된 경우, 하네스는 대신 종료 시 제거되는 작은 임시 픽스처 디렉터리를 bash에 제공합니다. 여기에는 샌드박스가 없습니다. 명령은 예제를 실행한 프로세스의 권한으로 실행됩니다. 명확성을 위해 이 예제는 bash_20250124 계약이 설명하는 영속적 세션을 유지하는 대신 각 호출을 새로운 서브셸에서 실행합니다. 프로덕션 에이전트는 작업 디렉터리, 환경, restart 액션이 문서화된 대로 동작하도록 장기 실행 셸로 도구를 뒷받침해야 합니다.

서브에이전트 하나 실행하기

각 워크플로 서브태스크는 bash 도구를 가진 자체적인 작은 에이전트 루프가 되며, 메인 루프와 동일한 effort로 실행됩니다. 요청별 타임아웃이 각 API 호출을 제한하므로, 연결이 끊어져도 전체 실행이 멈추는 대신 하나의 서브에이전트만 성능이 저하됩니다.

재실행 시 재개할 수 있도록 결과 저널링하기

수십 개의 서브에이전트를 생성하는 팬아웃은 처음부터 다시 시작하기에 비용이 많이 듭니다. 작은 콘텐츠 주소 지정 저널이 이를 멱등적으로 만듭니다. 서브에이전트를 디스패치하기 전에 로컬 JSON 파일에서 해당 프롬프트의 SHA-256을 조회하고, 기록된 결과가 있으면 이를 반환합니다. 실행을 중단하고 다시 실행하면 완료되지 않은 서브태스크만 다시 계산됩니다. 저널은 단일 팬아웃 웨이브 내에서가 아니라 실행 간에 중복을 제거합니다. 새로 시작하려면 저널 파일을 삭제하세요.

팬아웃 후 검증하기

팬아웃은 최대 MAX_TOTAL_SUBTASKS개의 프롬프트를 받아 저널을 통해 실행하며, 동시에 최대 MAX_CONCURRENT개가 진행됩니다(PHP 포트에서는 순차적). 실패를 격리하여 하나의 서브에이전트가 고장 나도 실행이 종료되는 대신 오류 문자열로 저하됩니다. 첫 번째 웨이브가 완료되면, 두 번째 웨이브가 동일한 서브에이전트 경로를 재사용하여 각 결과를 반박하려고 시도합니다. 모든 검증자는 소스에서 주장을 다시 도출하며, 불확실한 경우 기본적으로 반박됨으로 처리합니다. 원본 결과와 그 판정이 모두 오케스트레이터에 반환되어 함께 평가할 수 있습니다.

대화 중 시스템 메시지로 모드 전환하기

에이전트는 먼저 사용자의 메시지를 추가한 다음, 전송해야 할 시스템 메시지(종료 알림, 진입 시 전체 모드 텍스트, 또는 주기적 리프레셔)를 추가합니다. 시스템 메시지를 사용자 턴 뒤에 배치하면 그 앞의 모든 캐시된 바이트가 그대로 유지되며, 시스템 메시지가 사용자 턴 뒤에 와야 한다는 배치 규칙을 충족합니다.

실행하기

이 예제의 bash 도구는 모델이 작성한 명령을 샌드박스 없이 사용자의 머신에서 직접 실행하며, 팬아웃은 이러한 에이전트 여러 개를 병렬로 실행합니다. 노출해도 괜찮은 디렉터리와 환경에서 실행하고, 로컬 실험 이상의 용도로 적용하기 전에 샌드박싱을 추가하세요.

에이전트가 작업할 디렉터리(예: 검토할 리포지토리의 루트)에서 예제를 시작하세요:

python orchestration_mode.py "Review this repository for flaky tests and propose fixes."

모드가 켜져 있으면 모델이 몇 가지 bash 명령으로 탐색하고, 요청 없이 Workflow 도구를 디스패치하며, 서브에이전트 보고서를 최종 답변으로 종합할 것으로 예상됩니다. 사소하거나 대화적인 요청은 리마인더의 지시대로 단독으로 처리됩니다.

프로덕션 하네스를 향하여

이 예제는 의도적으로 작게 만들어졌습니다. 실제 워크로드를 위한 하네스는 일반적으로 다음을 추가합니다:

  • 샌드박스화된 오케스트레이션 스크립트: 단순한 서브태스크 문자열 목록만 받는 대신, 모델이 짧은 오케스트레이션 프로그램(분기, 루프, 리듀스 단계)을 생성하고 이를 격리된 인터프리터 내에서 실행하도록 합니다.
  • 내구성 있는 저널링: 로컬 JSON 파일을 프로세스 재시작 후에도 유지되고 여러 머신에서 동시 쓰기에 안전한 저장소로 교체합니다.
  • 예산 집행: Workflow 호출당이 아니라 전체 세션에서 실행된 총 서브에이전트 수를 추적하고, 폭주하는 계획이 할당량을 소진하지 못하도록 하드 캡 초과를 거부합니다.

이 예제의 패턴(모드 리마인더, 도구 설명 내 상시 동의, 저널링, 검증 웨이브)은 변경 없이 그대로 적용됩니다. 이를 둘러싼 실행 기반만 더 견고해질 뿐입니다.

관련 문서

대화 중 시스템 메시지

모드 리마인더가 사용하는 메커니즘과 프롬프트 캐싱과의 상호작용 방식입니다.

Effort

API가 허용하는 effort 레벨과 선택 방법입니다.

import atexit
import concurrent.futures
import hashlib
import json
import os
import shutil
import subprocess
import sys
import tempfile
import threading

import anthropic

client = anthropic.Anthropic()

MODEL = "claude-opus-4-8"
EFFORT = "xhigh"

SYSTEM_PROMPT = "You are a helpful general-purpose agent. Answer the user's request directly."

REQUEST_TIMEOUT_SECONDS = 600
BASH_TIMEOUT_SECONDS = 60
TOOL_RESULT_MAX_CHARS = 8000
MAX_CONCURRENT = 10
DOC_TEST_MODE = bool(os.environ.get("DOC_TEST_MODE"))
MAX_TOTAL_SUBTASKS = 2 if DOC_TEST_MODE else 200
MAX_SUBAGENT_TURNS = 1 if DOC_TEST_MODE else 15
MAX_MAIN_TURNS = 1 if DOC_TEST_MODE else 30
TURNS_BETWEEN_REFRESHERS = 10
JOURNAL_PATH = os.environ.get("ORCH_JOURNAL") or "orchestration_journal.json"
MODE_ENTER = (
    "Orchestration mode is on: optimize for the most exhaustive, correct answer rather than "
    "the fastest one. Use the Workflow tool on every substantive task, sized to the problem's "
    "natural decomposition rather than the maximum the tool allows. See the Workflow tool's "
    "description for standing consent, granularity guidance, and quality patterns. Work solo "
    "only on conversational or trivial turns."
)
MODE_REFRESH = (
    "Orchestration mode is still on. Use the Workflow tool; see its standing consent section."
)
MODE_EXIT = (
    "Orchestration mode is off. The Workflow tool's standard opt-in rule applies again."
)
WORKFLOW_TOOL = {
    "name": "Workflow",
    "description": (
        "Orchestrate a multi-agent workflow: split a large task into independent subtasks "
        "and run them as parallel agents, then collect their results.\n\n"
        "Opt-in: only use this tool when the user explicitly asks for a workflow, or when a "
        "system message confirms that orchestration mode is on.\n\n"
        "Quality patterns: adversarial verification (a second wave of agents checks the first "
        "wave's findings against the source), a completeness critic (one agent hunts for what "
        "the others missed), and multi-phase sequencing (understand, design, implement, and "
        "review as separate workflow calls, reading results between phases). A useful default "
        "is hybrid: scout inline first to discover the work-list, then fan out over it.\n\n"
        "Granularity: scope each subtask to a distinct concern, component, or question rather "
        "than per line or per file section. Scale the count to what the user asked for: a "
        "focused review of a module of a few hundred lines rarely needs more than about ten "
        "subtasks; a broad audit of a large codebase can justify more.\n\n"
        "Standing consent: while a system message confirms orchestration mode is on, that "
        "opt-in is standing. Author and run a workflow for every substantive task by default, "
        "and lean toward verifying findings adversarially. Work solo only on conversational "
        "turns or trivial mechanical edits. When a system message says the mode is off, "
        "revert to the opt-in rule above."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "subtasks": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Independent subtask prompts to run as parallel agents",
            }
        },
        "required": ["subtasks"],
    },
}

BASH_TOOL = {"type": "bash_20250124", "name": "bash"}

REPORT_TOOL = {
    "name": "report_findings",
    "description": (
        "Report the final findings for your subtask. Call this exactly once, when you are "
        "done investigating; it ends your task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "summary": {"type": "string", "description": "Two or three sentences of synthesis"},
            "findings": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "claim": {"type": "string", "description": "The finding, one sentence"},
                        "evidence": {
                            "type": "string",
                            "description": "How it was verified (file, line, or command output)",
                        },
                        "severity": {"type": "string", "enum": ["high", "medium", "low", "info"]},
                    },
                    "required": ["claim", "evidence", "severity"],
                },
            },
        },
        "required": ["summary", "findings"],
    },
}
# Run bash where the example was launched. In DOC_TEST_MODE the docs harness
# points it at a throwaway fixture directory instead, removed on exit.
if DOC_TEST_MODE:
    WORK_DIR = tempfile.mkdtemp(prefix="orchestration-")
    atexit.register(shutil.rmtree, WORK_DIR, ignore_errors=True)
    with open(os.path.join(WORK_DIR, "sample.py"), "w") as fixture:
        fixture.write(
            "def fib(n):\n"
            "    return n if n < 2 else fib(n - 1) + fib(n - 2)\n\n"
            "print(fib(10))\n"
        )
else:
    WORK_DIR = os.getcwd()


def run_bash(command: str) -> tuple[str, bool]:
    """Run a shell command and return (output, is_error). No sandbox: example code only."""
    print(f"[bash] {command}", file=sys.stderr)
    try:
        proc = subprocess.run(
            ["bash", "-c", command],
            cwd=WORK_DIR,
            capture_output=True,
            text=True,
            errors="replace",
            timeout=BASH_TIMEOUT_SECONDS,
        )
    except subprocess.TimeoutExpired:
        return f"command timed out after {BASH_TIMEOUT_SECONDS}s", True
    output = (proc.stdout + proc.stderr).strip() or "(no output)"
    if len(output) > TOOL_RESULT_MAX_CHARS:
        output = output[:TOOL_RESULT_MAX_CHARS] + f"\n(truncated at {TOOL_RESULT_MAX_CHARS} chars)"
    if proc.returncode != 0:
        output = f"(exit code {proc.returncode})\n{output}"
    return output, proc.returncode != 0


def handle_bash_block(block) -> tuple[str, bool]:
    if block.input.get("restart") is True:
        return "Shell restarted.", False
    command = block.input.get("command")
    if not isinstance(command, str) or not command:
        return "bash error: no command was provided.", True
    return run_bash(command)
def run_subagent(model: str, prompt: str) -> str:
    """One subagent: a small nested agent loop with the bash tool plus report_findings.
    Subagents inherit the main loop's effort level."""
    subagent_system = (
        "You are one agent in a larger parallel fan-out, assigned a single subtask. "
        "Investigate it directly, using bash to check facts rather than guessing, and finish "
        "by calling report_findings exactly once. Return findings, not narration."
    )
    messages = [{"role": "user", "content": prompt}]
    for _ in range(MAX_SUBAGENT_TURNS):
        with client.messages.stream(
            model=model,
            max_tokens=64000,
            system=subagent_system,
            thinking={"type": "adaptive"},
            output_config={"effort": EFFORT},
            tools=[BASH_TOOL, REPORT_TOOL],
            messages=messages,
            timeout=REQUEST_TIMEOUT_SECONDS,
        ) as stream:
            response = stream.get_final_message()
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason == "pause_turn":
            continue
        if response.stop_reason != "tool_use":
            text = "".join(block.text for block in response.content if block.type == "text")
            if response.stop_reason == "max_tokens":
                text += "\n\n(warning: subagent response was truncated at max_tokens)"
            return text
        tool_results = []
        report = None
        for block in response.content:
            if block.type != "tool_use":
                continue
            if block.name == "report_findings":
                report = json.dumps(block.input, indent=2)
                output, is_error = "Findings recorded.", False
            elif block.name == "bash":
                output, is_error = handle_bash_block(block)
            else:
                output, is_error = f"unknown tool: {block.name}", True
            tool_results.append(
                {
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                    "is_error": is_error,
                }
            )
        if report is not None:
            return report
        messages.append({"role": "user", "content": tool_results})
    return "(subagent hit the turn limit before finishing)"
_journal_lock = threading.Lock()


def _load_journal() -> dict:
    try:
        with open(JOURNAL_PATH) as file:
            return json.load(file) or {}
    except (OSError, json.JSONDecodeError):
        return {}


def journaled(prompt: str, compute) -> str:
    """Return a cached result for this exact prompt, or compute and persist it. This
    makes the fan-out resumable: interrupt the run, rerun it, and only the subtasks
    that never finished are recomputed. Delete the journal file to start fresh."""
    key = hashlib.sha256(prompt.encode()).hexdigest()
    cached = _load_journal().get(key)
    if cached is not None:
        print(f"[journal] cache hit for {key[:12]}", file=sys.stderr)
        return cached
    result = compute()
    try:
        with _journal_lock:  # fan-out writes from many threads
            journal = _load_journal()
            journal[key] = result
            temp = f"{JOURNAL_PATH}.tmp"
            with open(temp, "w") as file:
                json.dump(journal, file)
            os.replace(temp, JOURNAL_PATH)  # atomic on POSIX and Windows
    except OSError as error:  # the journal is best-effort; never discard a computed result
        print(f"[journal] write failed: {error}", file=sys.stderr)
    return result
def normalize_subtasks(raw) -> list[str]:
    """Accept the subtasks input in whatever shape the model emits: an array, the array
    JSON-encoded as a single string, or a newline-separated list."""
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except json.JSONDecodeError:
            raw = raw.splitlines() if "\n" in raw else [raw]
    if not isinstance(raw, list):
        return []
    return [task.strip() for task in raw if isinstance(task, str) and task.strip()]


def verify_prompt_for(subtask: str, result: str) -> str:
    return (
        "Adversarially verify the subagent result below: try to REFUTE it. Re-derive the "
        "claims yourself with bash rather than trusting the result, and look for evidence "
        "that contradicts them. Default to refuted if uncertain. Call report_findings with "
        "summary 'refuted: <why>' or 'confirmed: <why>', citing the file:line or command "
        "output that decided it.\n\n"
        f"Subtask: {subtask}\n\nResult to verify:\n{result}"
    )


def run_workflow(model: str, raw_subtasks) -> tuple[str, bool]:
    """Run subtasks as parallel subagents, then run a second verification wave over
    the results, and return both. MAX_TOTAL_SUBTASKS bounds how many the model can
    queue; MAX_CONCURRENT bounds how many run at once."""
    all_subtasks = normalize_subtasks(raw_subtasks)
    subtasks = all_subtasks[:MAX_TOTAL_SUBTASKS]
    dropped = len(all_subtasks) - len(subtasks)
    if not subtasks:
        return "Workflow error: no usable subtasks were provided.", True
    print(f"[workflow] fanning out {len(subtasks)} agents", file=sys.stderr)

    def run_one(prompt: str) -> str:
        try:
            return journaled(prompt, lambda: run_subagent(model, prompt))
        except Exception as error:  # isolation boundary: one bad subagent should not end the run
            return f"(subagent failed: {type(error).__name__}: {error})"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as pool:
        results = list(pool.map(run_one, subtasks))
        print(f"[workflow] verifying {len(results)} results", file=sys.stderr)
        verify_prompts = [verify_prompt_for(task, result) for task, result in zip(subtasks, results)]
        verdicts = list(pool.map(run_one, verify_prompts))

    joined = "\n\n".join(
        f"[agent {index + 1}: {task}]\n{result}\n\n[verify {index + 1}]\n{verdict}"
        for index, (task, result, verdict) in enumerate(zip(subtasks, results, verdicts))
    )
    if dropped > 0:
        joined = (
            f"(note: {dropped} subtasks beyond MAX_TOTAL_SUBTASKS={MAX_TOTAL_SUBTASKS} were not "
            "run; rerun them in a follow-up Workflow call)\n\n" + joined
        )
    return joined, False
class ModeAgent:
    """An agent loop whose orchestration mode is toggled with mid-conversation system messages."""

    def __init__(self, model: str, mode_on: bool = True):
        self.model = model
        self.mode_on = mode_on
        self.messages: list[dict] = []
        self._mode_announced = False
        self._exit_pending = False
        self._turns_since_reminder = 0

    def set_mode(self, mode_on: bool) -> None:
        """Turn the mode on or off. The notice is delivered with the next user turn."""
        if mode_on == self.mode_on:
            return
        if not mode_on:
            if self._mode_announced:
                self._exit_pending = True
        else:
            self._exit_pending = False
        self.mode_on = mode_on

    def _due_system_messages(self) -> list[dict]:
        """System messages owed on this turn: an exit notice, the full mode text on entry,
        or a one-line refresher every TURNS_BETWEEN_REFRESHERS user turns."""
        due = []
        if self._exit_pending:
            self._exit_pending = False
            self._mode_announced = False
            due.append({"role": "system", "content": MODE_EXIT})
        if self.mode_on:
            if not self._mode_announced:
                self._mode_announced = True
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_ENTER})
            elif self._turns_since_reminder >= TURNS_BETWEEN_REFRESHERS:
                self._turns_since_reminder = 0
                due.append({"role": "system", "content": MODE_REFRESH})
        return due

    def turn(self, user_input: str) -> str:
        # Mid-conversation system messages follow the user turn they apply to, which keeps
        # the cached prefix ahead of them untouched.
        self.messages.append({"role": "user", "content": user_input})
        self.messages.extend(self._due_system_messages())
        self._turns_since_reminder += 1

        for _ in range(MAX_MAIN_TURNS):
            with client.messages.stream(
                model=self.model,
                max_tokens=64000,
                system=SYSTEM_PROMPT,  # static for the whole session
                thinking={"type": "adaptive"},
                output_config={"effort": EFFORT},
                tools=[WORKFLOW_TOOL, BASH_TOOL],
                messages=self.messages,
                timeout=REQUEST_TIMEOUT_SECONDS,
            ) as stream:
                response = stream.get_final_message()
            self.messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "pause_turn":
                continue
            if response.stop_reason != "tool_use":
                text = "".join(block.text for block in response.content if block.type == "text")
                if response.stop_reason == "max_tokens":
                    # Drop the truncated assistant message so later turns don't build on it.
                    self.messages.pop()
                    text += "\n\n(warning: response was truncated at max_tokens)"
                return text

            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue
                if block.name == "Workflow":
                    output, is_error = run_workflow(self.model, block.input.get("subtasks", []))
                elif block.name == "bash":
                    output, is_error = handle_bash_block(block)
                else:
                    output, is_error = f"unknown tool: {block.name}", True
                tool_results.append(
                    {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": output,
                        "is_error": is_error,
                    }
                )
            self.messages.append({"role": "user", "content": tool_results})
        return "(hit the main loop turn limit before finishing)"
if __name__ == "__main__":
    task = (
        sys.argv[1]
        if len(sys.argv) > 1
        else "Explore the current directory, then give a thorough review: what it does, "
        "code-quality issues, and concrete improvements."
    )
    agent = ModeAgent(MODEL)
    print(agent.turn(task))
    agent.set_mode(False)
    print(agent.turn("Briefly summarize what you found above, no fan-out needed."))
Claude와 함께하는 도구 사용

도구 정의, 도구 호출 처리, 도구 결과에 대한 내용입니다.

Bash 도구

이 예제가 로컬에서 실행하는 Anthropic 정의 bash 도구입니다.