Cookbook
Agent Patterns
View on GitHub

Async multi-agent orchestration

Two async multi-agent patterns — a fixed N-agent team with peer messaging through a shared hub, and dynamically spawned async subagents — reduced to their bare messaging and lifecycle mechanics.

Paul Chen
Paul Chen
@paulchen-go
Published on June 8, 2026
Was this page helpful?

Async Multi-Agent Orchestration with Claude

This cookbook shows the shape of the two multi-agent orchestration patterns behind the multi-agent results in the Claude Opus 4.8 system card: a fixed N-agent team and async subagents. There is no domain task here — just the messaging and subagent mechanics, so you can see exactly which tools fire and in what order, then drop your own tools and task in.

Everything runs on the public Anthropic Python SDK + asyncio, with any API key, typically in under thirty seconds.

Setup

python
%pip install -qU anthropic
python
import asyncio
import itertools
from collections import Counter, defaultdict
 
import anthropic
 
MODEL = "claude-opus-4-8"  # swap for the newest Claude model available to you
client = anthropic.AsyncAnthropic()  # reads ANTHROPIC_API_KEY from env

Shared building blocks

The message hub

Every agent gets an inbox list and an asyncio.Event for blocking waits. drain empties an inbox and returns the structured messages; render formats them as text for appending to a tool result.

python
class Hub:
    def __init__(self):
        self.inbox: dict[str, list[dict]] = defaultdict(list)
        self.event: dict[str, asyncio.Event] = defaultdict(asyncio.Event)
        self.status: dict[str, str] = {}
        self._ids = itertools.count(1)
 
    def register(self, name: str):
        self.status[name] = "active"
        _ = self.inbox[name], self.event[name]
 
    def new_name(self, prefix="helper") -> str:
        n = f"{prefix}{next(self._ids)}"
        self.register(n)
        return n
 
    def post(self, sender: str, recipients: list[str], content: str) -> list[str]:
        delivered = []
        for rid in recipients:
            if rid in self.status:
                self.inbox[rid].append({"from": sender, "content": content})
                self.event[rid].set()
                delivered.append(rid)
        return delivered
 
    def drain(self, name: str) -> list[dict]:
        msgs, self.inbox[name] = self.inbox[name], []
        self.event[name] = asyncio.Event()
        return msgs
 
    @staticmethod
    def render(msgs: list[dict]) -> str:
        if not msgs:
            return ""
        body = "\n".join(
            f'<agent-message from="{m["from"]}">\n{m["content"]}\n</agent-message>' for m in msgs
        )
        return f"\n\n[Messages received while you were working:]\n{body}"

Messaging tools

Every agent in both patterns gets these two.

python
SEND_MESSAGE = {
    "name": "send_message",
    "description": (
        "Send a message to one or more other agents. It will appear appended to their next tool "
        "result. This is the ONLY way to reach other agents — plain text in your turn goes nowhere."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "recipient_ids": {"type": "array", "items": {"type": "string"}, "minItems": 1},
            "content": {"type": "string"},
        },
        "required": ["recipient_ids", "content"],
    },
}
WAIT_FOR_MESSAGE = {
    "name": "wait_for_message",
    "description": (
        "Block until another agent messages you. Note: messages also arrive automatically appended "
        "to the result of ANY other tool call, so only use this when you have nothing else to do."
    ),
    "input_schema": {"type": "object", "properties": {}},
}
BASE_TOOLS = [SEND_MESSAGE, WAIT_FOR_MESSAGE]

The base agent loop

One function runs any agent: a standard tool-use loop that dispatches send_message / wait_for_message against the hub, routes any extra tools through extra_dispatch, and appends the drained inbox to the last tool result — so agents never poll; messages arrive inline. Each tool call and inbox delivery is printed live so you can watch the orchestration unfold; a summary trace prints at the end.

python
TRACE: dict[str, list[str]] = defaultdict(list)
 
 
def _snip(s, n=60):
    s = str(s).replace("\n", " ")
    return s if len(s) <= n else s[:n] + "…"
 
 
async def run_agent(
    hub: Hub,
    name: str,
    system: str,
    first_user_turn: str,
    tools: list = None,
    extra_dispatch=None,
    max_turns: int = 20,
) -> str:
    tools = tools or BASE_TOOLS
    extra_dispatch = extra_dispatch or {}
    messages = [{"role": "user", "content": first_user_turn}]
 
    try:
        for _ in range(max_turns):
            resp = await client.messages.create(
                model=MODEL,
                max_tokens=2048,
                system=system,
                tools=tools,
                messages=messages,
            )
            messages.append({"role": "assistant", "content": resp.content})
 
            if resp.stop_reason == "end_turn":
                hub.status[name] = "done"
                return "".join(getattr(b, "text", "") for b in resp.content)
            if resp.stop_reason != "tool_use":
                raise RuntimeError(f"unexpected stop_reason: {resp.stop_reason}")
 
            results = []
            for block in resp.content:
                if block.type != "tool_use":
                    continue
                TRACE[name].append(block.name)
                if block.name == "send_message":
                    rids = block.input["recipient_ids"]
                    delivered = hub.post(name, rids, block.input["content"])
                    unknown = [r for r in rids if r not in delivered]
                    out = f"delivered to {delivered}" + (f"; unknown: {unknown}" if unknown else "")
                elif block.name == "wait_for_message":
                    hub.status[name] = "idling"
                    try:
                        await asyncio.wait_for(hub.event[name].wait(), timeout=60)
                        out = "woke: new messages"
                    except TimeoutError:
                        out = "woke: 60s timeout"
                    hub.status[name] = "active"
                elif block.name in extra_dispatch:
                    out = await extra_dispatch[block.name](block)
                else:
                    out = f"error: no dispatch for {block.name}"
                print(f"  [{name}] {block.name}({_snip(block.input)}) → {_snip(out)}")
                results.append({"type": "tool_result", "tool_use_id": block.id, "content": out})
 
            inbox = hub.drain(name)
            for m in inbox:
                print(f"  [{name}] ← received from {m['from']}: {_snip(m['content'])}")
            if results:
                results[-1]["content"] += hub.render(inbox)  # ← the key line
            messages.append({"role": "user", "content": results})
 
        hub.status[name] = "done"
        return f"[{name} hit max_turns={max_turns}]"
    except Exception:
        hub.status[name] = "crashed"
        raise
 
 
def print_trace():
    for agent in sorted(TRACE):
        counts = Counter(TRACE[agent])
        print(f"  {agent}: " + ", ".join(f"{n}×{t}" for t, n in counts.most_common()))
    TRACE.clear()

Part 1 · Fixed N-Agent Team

Three agents — one lead and two helpers — each given a one-line persona. All they do is introduce themselves to each other via send_message; the lead writes a one-sentence summary and ends the run. No domain tool: this is purely the messaging path.

python
TEAM_SYSTEM = "You are {name}, one of 3 agents working together (peers: {peers})."
 
TASKS = {
    "lead": "You are the lead. Introduce yourself to the others. Once everyone has introduced "
    "themselves, finish with a one-sentence summary of the team.",
    "helper1": "You are a backend engineer named Ada. Introduce yourself to the others, then wait "
    "for their replies.",
    "helper2": "You are a designer named Bo. Introduce yourself to the others, then wait for their "
    "replies.",
}
 
 
async def run_team() -> str:
    hub = Hub()
    names = list(TASKS)
    for n in names:
        hub.register(n)
 
    helper_tasks = [
        asyncio.create_task(
            run_agent(
                hub,
                n,
                system=TEAM_SYSTEM.format(name=n, peers=[p for p in names if p != n]),
                first_user_turn=TASKS[n],
            )
        )
        for n in names[1:]
    ]
    try:
        return await run_agent(
            hub,
            "lead",
            system=TEAM_SYSTEM.format(name="lead", peers=names[1:]),
            first_user_turn=TASKS["lead"],
        )
    finally:
        for t in helper_tasks:
            t.cancel()
        await asyncio.gather(*helper_tasks, return_exceptions=True)
python
answer = await run_team()
print(f"\n[lead final answer]\n{answer}\n\nTool-call summary:")
print_trace()
[helper1] send_message({'recipient_ids': ['lead', 'helper2'], 'content': "Hi all, I…) → delivered to ['lead', 'helper2']
  [helper2] send_message({'recipient_ids': ['lead', 'helper1'], 'content': "Hi everyo…) → delivered to ['lead', 'helper1']
  [helper2] ← received from helper1: Hi all, I'm Ada, a backend engineer on the team. I focus on …
  [lead] send_message({'recipient_ids': ['helper1', 'helper2'], 'content': "Hi tea…) → delivered to ['helper1', 'helper2']
  [lead] ← received from helper1: Hi all, I'm Ada, a backend engineer on the team. I focus on …
  [lead] ← received from helper2: Hi everyone! I'm Bo, the designer on the team. Looking forwa…
  [helper1] wait_for_message({}) → woke: new messages
  [helper1] ← received from helper2: Hi everyone! I'm Bo, the designer on the team. Looking forwa…
  [helper1] ← received from lead: Hi team, I'm lead, the coordinator for our group. Looking fo…
  [helper2] wait_for_message({}) → woke: new messages
  [helper2] ← received from lead: Hi team, I'm lead, the coordinator for our group. Looking fo…
  [lead] send_message({'recipient_ids': ['helper1', 'helper2'], 'content': "Thanks…) → delivered to ['helper1', 'helper2']

[lead final answer]
Everyone has introduced themselves, and I've shared the summary.

**Team summary:** We're a three-person team with me (lead) coordinating, Ada handling backend engineering (APIs, databases, architecture), and Bo driving design — a well-rounded crew ready to build great things together.

Tool-call summary:
  helper1: 1×send_message, 1×wait_for_message
  helper2: 1×send_message, 1×wait_for_message
  lead: 2×send_message

Part 2 · Async Subagents (dynamic spawn)

Now give the lead subagent tools and a trivial client-side sleep(seconds) tool. The lead spawns three helpers; each sleeps N seconds, send_messages "done" to the lead, then wait_for_messages for further instructions. The spawn call returns immediately; the lead checks get_status while they run, collects their reports via wait_for_message, then kill_subagents all three to dismiss them. No domain logic — just the full spawn / status / collect / kill path.

python
SLEEP = {
    "name": "sleep",
    "description": "Sleep for the given number of seconds, then return.",
    "input_schema": {
        "type": "object",
        "properties": {"seconds": {"type": "integer", "minimum": 0, "maximum": 10}},
        "required": ["seconds"],
    },
}
SUBAGENT_TOOLS = [
    {
        "name": "create_subagents",
        "description": "Spawn helper subagents. Returns immediately — helpers run concurrently in the "
        "background in parallel with you. Each gets base_instruction, optionally + "
        "per_subagent_instructions[i].",
        "input_schema": {
            "type": "object",
            "properties": {
                "base_instruction": {"type": "string"},
                "per_subagent_instructions": {
                    "type": "array",
                    "items": {"type": "string"},
                    "maxItems": 10,
                },
            },
            "required": ["base_instruction"],
        },
    },
    {
        "name": "get_status",
        "description": "Status of every helper (active / idling / done / crashed).",
        "input_schema": {"type": "object", "properties": {}},
    },
    {
        "name": "kill_subagents",
        "description": "Cancel running helpers you no longer need.",
        "input_schema": {
            "type": "object",
            "properties": {
                "subagent_ids": {"type": "array", "items": {"type": "string"}, "minItems": 1}
            },
            "required": ["subagent_ids"],
        },
    },
]
 
 
async def dispatch_sleep(block) -> str:
    s = int(block.input["seconds"])
    await asyncio.sleep(s)
    return f"slept {s}s"
python
async def run_spawn_lead() -> str:
    hub = Hub()
    hub.register("lead")
    helpers: dict[str, asyncio.Task] = {}
 
    async def _create(block):
        base = block.input["base_instruction"]
        per = block.input.get("per_subagent_instructions") or [""]
        spawned = []
        for suffix in per:
            h = hub.new_name()
            helpers[h] = asyncio.create_task(
                run_agent(
                    hub,
                    h,
                    system=f"You are {h}, a helper.",
                    first_user_turn=f"{base}\n\n{suffix}".strip(),
                    tools=[SLEEP, *BASE_TOOLS],
                    extra_dispatch={"sleep": dispatch_sleep},
                )
            )
            spawned.append(h)
        return f"spawned: {', '.join(spawned)}"
 
    async def _status(block):
        return "\n".join(f"{n}: {s}" for n, s in hub.status.items() if n != "lead") or "(none)"
 
    async def _kill(block):
        killed_ids, to_await = [], []
        for sid in block.input["subagent_ids"]:
            if sid in helpers and not helpers[sid].done():
                helpers[sid].cancel()
                hub.status[sid] = "done"
                to_await.append(helpers.pop(sid))
                killed_ids.append(sid)
        await asyncio.gather(*to_await, return_exceptions=True)
        return f"cancelled: {', '.join(killed_ids)}" if killed_ids else "no matching active helpers"
 
    try:
        return await run_agent(
            hub,
            "lead",
            system="You are the lead.",
            first_user_turn=(
                "Spawn three helper agents. Instruct each to sleep a different number of seconds "
                "(1, 2, 3), report back to you 'done, slept Ns', then wait for your further "
                "instructions. After that, check that they're running, collect all three reports, "
                "then dismiss all three helpers. Finish with a one-line summary."
            ),
            tools=[*SUBAGENT_TOOLS, *BASE_TOOLS],
            extra_dispatch={
                "create_subagents": _create,
                "get_status": _status,
                "kill_subagents": _kill,
            },
        )
    finally:
        for t in helpers.values():
            t.cancel()
        await asyncio.gather(*helpers.values(), return_exceptions=True)
python
answer = await run_spawn_lead()
print(f"\n[lead final answer]\n{answer}\n\nTool-call summary:")
print_trace()
[lead] create_subagents({'base_instruction': 'You are a helper agent. Follow your sp…) → spawned: helper1, helper2, helper3
  [lead] get_status({}) → helper1: active helper2: active helper3: active
  [helper1] sleep({'seconds': 1}) → slept 1s
  [helper2] sleep({'seconds': 2}) → slept 2s
  [helper1] send_message({'recipient_ids': ['lead'], 'content': 'done, slept 1s'}) → delivered to ['lead']
  [lead] wait_for_message({}) → woke: new messages
  [lead] ← received from helper1: done, slept 1s
  [helper3] sleep({'seconds': 3}) → slept 3s
  [helper2] send_message({'recipient_ids': ['lead'], 'content': 'done, slept 2s'}) → delivered to ['lead']
  [lead] wait_for_message({}) → woke: new messages
  [lead] ← received from helper2: done, slept 2s
  [helper3] send_message({'recipient_ids': ['lead'], 'content': 'done, slept 3s'}) → delivered to ['lead']
  [lead] wait_for_message({}) → woke: new messages
  [lead] ← received from helper3: done, slept 3s
  [lead] kill_subagents({'subagent_ids': ['helper1', 'helper2', 'helper3']}) → cancelled: helper1, helper2, helper3

[lead final answer]
Summary: All 3 helpers were spawned and confirmed active, each reported back successfully ("done, slept 1s/2s/3s"), and all three were then dismissed.

Tool-call summary:
  helper1: 1×sleep, 1×send_message, 1×wait_for_message
  helper2: 1×sleep, 1×send_message, 1×wait_for_message
  helper3: 1×sleep, 1×send_message, 1×wait_for_message
  lead: 3×wait_for_message, 1×create_subagents, 1×get_status, 1×kill_subagents

Next steps

Swap in your own domain tools — add them to tools and a handler to extra_dispatch. The Hub, run_agent, and the two team runners above are all you need; the rest is your task and your tools.

See the tool use guide for the full tool-definition reference.

Was this page helpful?