Loading...
  • 构建
  • 管理
  • 模型与定价
  • 客户端 SDK
  • API 参考
Search...
⌘K
Log in
批量处理
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Solutions

  • AI agents
  • Code modernization
  • Coding
  • Customer support
  • Education
  • Financial services
  • Government
  • Life sciences

Partners

  • Amazon Bedrock
  • Google Cloud's Vertex AI

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Company

  • Anthropic
  • Careers
  • Economic Futures
  • Research
  • News
  • Responsible Scaling Policy
  • Security and compliance
  • Transparency

Learn

  • Blog
  • Courses
  • Use cases
  • Connectors
  • Customer stories
  • Engineering at Anthropic
  • Events
  • Powered by Claude
  • Service partners
  • Startups program

Help and security

  • Availability
  • Status
  • Support
  • Discord

Terms and policies

  • Privacy policy
  • Responsible disclosure policy
  • Terms of service: Commercial
  • Terms of service: Consumer
  • Usage policy
构建/模型能力

批量处理

使用 Message Batches API 高效处理大量请求

Was this page helpful?

  • Message Batches API 如何工作
  • 如何使用 Message Batches API
  • 列出所有 Message Batches
  • 取消 Message Batch
  • 在 Message Batches 中使用提示缓存

批量处理是一种强大的方法,可以高效地处理大量请求。与其一次处理一个请求并立即获得响应不同,批量处理允许您一次提交多个请求进行异步处理。这种模式特别适用于以下情况:

  • 您需要处理大量数据
  • 不需要立即响应
  • 您想优化成本效率
  • 您正在运行大规模评估或分析

Message Batches API 是 Anthropic 对这种模式的首次实现。

This feature is not eligible for Zero Data Retention (ZDR). Data is retained according to the feature's standard retention policy.


Message Batches API

Message Batches API 是一种强大且经济高效的方式,可以异步处理大量 Messages 请求。这种方法非常适合不需要立即响应的任务,大多数批次在不到 1 小时内完成,同时可以将成本降低 50% 并提高吞吐量。

您可以 直接探索 API 参考,以及阅读本指南。

Message Batches API 如何工作

当您向 Message Batches API 发送请求时:

  1. 系统使用提供的 Messages 请求创建一个新的 Message Batch。
  2. 然后异步处理该批次,每个请求独立处理。
  3. 您可以轮询批次的状态,并在所有请求处理完成后检索结果。

这对于不需要立即结果的批量操作特别有用,例如:

  • 大规模评估:高效处理数千个测试用例。
  • 内容审核:异步分析大量用户生成的内容。
  • 数据分析:为大型数据集生成见解或摘要。
  • 批量内容生成:为各种目的创建大量文本(例如产品描述、文章摘要)。

批次限制

  • Message Batch 限制为 100,000 个 Message 请求或 256 MB 大小,以先达到的为准。
  • 系统尽快处理每个批次,大多数批次在 1 小时内完成。您可以在所有消息完成后或 24 小时后(以先发生者为准)访问批次结果。如果处理在 24 小时内未完成,批次将过期。
  • 批次结果在创建后 29 天内可用。之后,您仍然可以查看 Batch,但其结果将不再可供下载。
  • 批次的范围限定为 Workspace。您可以查看在您的 API 密钥所属的 Workspace 中创建的所有批次(及其结果)。
  • 速率限制适用于 Batches API HTTP 请求和批次内等待处理的请求数。请参阅 Message Batches API 速率限制。此外,处理速度可能会根据当前需求和您的请求量而减慢。在这种情况下,您可能会看到更多请求在 24 小时后过期。
  • 由于高吞吐量和并发处理,批次可能会略微超过您的 Workspace 配置的 支出限制。

支持的模型

所有 活跃模型 都支持 Message Batches API。

可以批处理的内容

任何您可以向 Messages API 发出的请求都可以包含在批次中。这包括:

  • 视觉
  • 工具使用
  • 系统消息
  • 多轮对话
  • 任何测试版功能

由于批次中的每个请求都是独立处理的,您可以在单个批次中混合不同类型的请求。

由于批次处理可能需要超过 5 分钟,请考虑在处理具有共享上下文的批次时使用 1 小时缓存持续时间 和提示缓存以获得更好的缓存命中率。


定价

Batches API 提供显著的成本节省。所有使用费用按标准 API 价格的 50% 收费。

ModelBatch inputBatch output
Claude Opus 4.7$2.50 / MTok$12.50 / MTok
Claude Opus 4.6$2.50 / MTok$12.50 / MTok
Claude Opus 4.5$2.50 / MTok$12.50 / MTok
Claude Opus 4.1$7.50 / MTok$37.50 / MTok
Claude Opus 4$7.50 / MTok$37.50 / MTok
Claude Sonnet 4.6$1.50 / MTok$7.50 / MTok
Claude Sonnet 4.5$1.50 / MTok$7.50 / MTok
Claude Sonnet 4$1.50 / MTok$7.50 / MTok
Claude Sonnet 3.7 (deprecated)

如何使用 Message Batches API

准备并创建您的批次

Message Batch 由创建 Message 的请求列表组成。单个请求的形状包括:

  • 用于标识 Messages 请求的唯一 custom_id。必须是 1 到 64 个字符,仅包含字母数字字符、连字符和下划线(匹配 ^[a-zA-Z0-9_-]{1,64}$)。
  • 一个 params 对象,包含标准 Messages API 参数

您可以通过将此列表传递到 requests 参数来 创建批次:

在此示例中,两个单独的请求被一起批处理以进行异步处理。每个请求都有唯一的 custom_id 并包含您用于 Messages API 调用的标准参数。

使用 Messages API 测试您的批次请求

对每个消息请求的 params 对象的验证是异步执行的,验证错误在整个批次处理完成时返回。您可以通过首先使用 Messages API 验证您的请求形状来确保您正确构建输入。

首次创建批次时,响应的处理状态将为 in_progress。

Output
{
  "id": "msgbatch_01HkcTjaV5uDC8jWR4ZsDV8d",
  "type": "message_batch",
  "processing_status": "in_progress",
  "request_counts": {
    "processing": 2,
    "succeeded": 0,
    "errored": 0,
    "canceled": 0,
    "expired": 0
  },
  "ended_at": null,
  "created_at": "2024-09-24T18:37:24.100435Z",
  "expires_at": "2024-09-25T18:37:24.100435Z",
  "cancel_initiated_at": null,
  "results_url": null
}

跟踪您的批次

Message Batch 的 processing_status 字段指示批次处理所处的阶段。它首先为 in_progress,然后在批次中的所有请求完成处理且结果准备就绪后更新为 ended。您可以通过访问 Console 或使用 检索端点 来监控批次的状态。

轮询 Message Batch 完成

要轮询 Message Batch,您需要其 id,该 ID 在创建批次时的响应中提供,或通过列出批次获得。您可以实现一个轮询循环,定期检查批次状态,直到处理完成:

列出所有 Message Batches

您可以使用 列表端点 列出 Workspace 中的所有 Message Batches。API 支持分页,根据需要自动获取其他页面:

检索批处理结果

批处理结束后,批处理中的每个 Messages 请求都有一个结果。有 4 种结果类型:

结果类型描述
succeeded请求成功。包括消息结果。
errored请求遇到错误,未创建消息。可能的错误包括无效请求和内部服务器错误。这些请求不会被计费。
canceled用户在此请求发送到模型之前取消了批处理。这些请求不会被计费。
expired批处理在此请求发送到模型之前达到了 24 小时过期时间。这些请求不会被计费。

您将看到批处理结果的概览,其中包含批处理的 request_counts,显示有多少请求达到了这四种状态中的每一种。

批处理的结果可在 Message Batch 上的 results_url 属性处下载,如果组织权限允许,也可在 Console 中下载。由于结果的潜在大小,建议流式传输结果而不是一次性下载所有结果。

结果采用 .jsonl 格式,其中每一行都是一个有效的 JSON 对象,代表 Message Batch 中单个请求的结果。对于每个流式传输的结果,您可以根据其 custom_id 和结果类型执行不同的操作。以下是一个示例结果集:

.jsonl file
{"custom_id":"my-second-request","result":{"type":"succeeded","message":{"id":"msg_014VwiXbi91y3JMjcpyGBHX5","type":"message","role":"assistant","model":"claude-opus-4-7","content":[{"type":"text","text":"Hello again! It's nice to see you. How can I assist you today? Is there anything specific you'd like to chat about or any questions you have?"}],"stop_reason":"end_turn","stop_sequence":null,"usage":{"input_tokens":11,"output_tokens":36}}}}
{"custom_id":"my-first-request","result":{"type":"succeeded","message":{"id":"msg_01FqfsLoHwgeFbguDgpz48m7","type":"message","role":"assistant","model":"claude-opus-4-7","content":[{"type":"text","text":"Hello! How can I assist you today? Feel free to ask me any questions or let me know if there's anything you'd like to chat about."}],"stop_reason":"end_turn","stop_sequence":null,"usage":{"input_tokens":10,"output_tokens":34}}}}

如果您的结果有错误,其 result.error 将设置为标准错误形状。

批处理结果可能与输入顺序不匹配

批处理结果可以按任何顺序返回,可能与创建批处理时请求的顺序不匹配。在上面的示例中,第二个批处理请求的结果在第一个之前返回。要正确匹配结果与其对应的请求,请始终使用 custom_id 字段。

取消 Message Batch

您可以使用取消端点取消当前正在处理的 Message Batch。取消后,批处理的 processing_status 将立即变为 canceling。您可以使用上面描述的相同轮询技术来等待取消完成。已取消的批处理最终状态为 ended,可能包含取消前已处理的请求的部分结果。

响应将显示处于 canceling 状态的批处理:

Output
{
  "id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
  "type": "message_batch",
  "processing_status": "canceling",
  "request_counts": {
    "processing": 2,
    "succeeded": 0,
    "errored": 0,
    "canceled": 0,
    "expired": 0
  },
  "ended_at": null,
  "created_at": "2024-09-24T18:37:24.100435Z",
  "expires_at": "2024-09-25T18:37:24.100435Z",
  "cancel_initiated_at": "2024-09-24T18:39:03.114875Z",
  "results_url": null
}

在 Message Batches 中使用提示缓存

Message Batches API 支持提示缓存,允许您可能降低批处理请求的成本和处理时间。提示缓存和 Message Batches 的定价折扣可以叠加,当同时使用这两个功能时可以提供更大的成本节省。但是,由于批处理请求是异步和并发处理的,缓存命中是尽力而为的基础。用户通常会根据其流量模式体验 30% 到 98% 的缓存命中率。

为了最大化批处理请求中缓存命中的可能性:

  1. 在批处理中的每个 Message 请求中包含相同的 cache_control 块
  2. 维持稳定的请求流,以防止缓存条目在其 5 分钟的生命周期后过期
  3. 构建您的请求以共享尽可能多的缓存内容

在批处理中实现提示缓存的示例:

在此示例中,批处理中的两个请求都包含相同的系统消息和标记有 cache_control 的完整 Pride and Prejudice 文本,以增加缓存命中的可能性。

扩展输出(测试版)

output-300k-2026-03-24 测试版标头将 max_tokens 上限提高到 300,000,用于使用 Claude Opus 4.7、Claude Opus 4.6 或 Claude Sonnet 4.6 的批处理请求。包含该标头以在单个回合中生成远长于标准限制(取决于模型的 64k 到 128k)的输出。

扩展输出仅在 Message Batches API 上可用,不在同步 Messages API 上可用。它在 Claude API 上受支持,在 Amazon Bedrock、Vertex AI 或 Microsoft Foundry 上不可用。

使用扩展输出进行长篇幅生成,例如书籍长度的草稿和技术文档、详尽的结构化数据提取、大型代码生成脚手架和长推理链。

单个 300k 令牌的生成可能需要一个多小时才能完成,因此请根据 24 小时处理窗口计划您的批处理提交。标准批处理定价(标准 API 价格的 50%)适用。

有效批处理的最佳实践

为了充分利用 Batches API:

  • 定期监控批处理状态并为失败的请求实现适当的重试逻辑。
  • 使用有意义的 custom_id 值来轻松匹配结果与请求,因为顺序不保证。
  • 考虑将非常大的数据集分解为多个批处理以获得更好的可管理性。
  • 使用 Messages API 对单个请求形状进行干运行以避免验证错误。

常见问题故障排除

如果遇到意外行为:

  • 验证总批处理请求大小不超过 256 MB。如果请求大小过大,您可能会收到 413 request_too_large 错误。
  • 检查您是否为批处理中的所有请求使用支持的模型。
  • 确保批处理中的每个请求都有唯一的 custom_id。
  • 确保自批处理 created_at(不是处理 ended_at)时间以来已不超过 29 天。如果已超过 29 天,结果将不再可查看。
  • 确认批处理尚未被取消。

请注意,批处理中一个请求的失败不会影响其他请求的处理。


批处理存储和隐私

  • 工作区隔离:批处理在创建它们的工作区内隔离。它们只能由与该工作区关联的 API 密钥或有权限在 Console 中查看工作区批处理的用户访问。

  • 结果可用性:批处理结果在创建后的 29 天内可用,为检索和处理提供充足的时间。


数据保留

批处理存储在创建后最多 29 天内存储请求和响应数据。您可以在处理后随时使用 DELETE /v1/messages/batches/{batch_id} 端点删除消息批处理。要删除正在进行的批处理,请先取消它。异步处理需要在批处理完成和结果检索之前进行服务器端存储输入和输出。

有关所有功能的 ZDR 资格,请参阅 API 和数据保留。

常见问题

$1.50 / MTok
$7.50 / MTok
Claude Haiku 4.5$0.50 / MTok$2.50 / MTok
Claude Haiku 3.5$0.40 / MTok$2 / MTok
Claude Opus 3 (deprecated)$7.50 / MTok$37.50 / MTok
Claude Haiku 3$0.125 / MTok$0.625 / MTok
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request

client = anthropic.Anthropic()

message_batch = client.messages.batches.create(
    requests=[
        Request(
            custom_id="my-first-request",
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-7",
                max_tokens=1024,
                messages=[
                    {
                        "role": "user",
                        "content": "Hello, world",
                    }
                ],
            ),
        ),
        Request(
            custom_id="my-second-request",
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-7",
                max_tokens=1024,
                messages=[
                    {
                        "role": "user",
                        "content": "Hi again, friend",
                    }
                ],
            ),
        ),
    ]
)

print(message_batch)
import time

client = anthropic.Anthropic()

MESSAGE_BATCH_ID = "msgbatch_01HkcTjaV5uDC8jWR4ZsDV8d"

message_batch = None
while True:
    message_batch = client.messages.batches.retrieve(MESSAGE_BATCH_ID)
    if message_batch.processing_status == "ended":
        break

    print(f"Batch {MESSAGE_BATCH_ID} is still processing...")
    time.sleep(60)
print(message_batch)
client = anthropic.Anthropic()

# Automatically fetches more pages as needed.
for message_batch in client.messages.batches.list(limit=20):
    print(message_batch)
client = anthropic.Anthropic()

# Stream results file in memory-efficient chunks, processing one at a time
for result in client.messages.batches.results(
    "msgbatch_01HkcTjaV5uDC8jWR4ZsDV8d",
):
    match result.result.type:
        case "succeeded":
            print(f"Success! {result.custom_id}")
        case "errored":
            if result.result.error.error.type == "invalid_request_error":
                # Request body must be fixed before re-sending request
                print(f"Validation error {result.custom_id}")
            else:
                # Request can be retried directly
                print(f"Server error {result.custom_id}")
        case "expired":
            print(f"Request expired {result.custom_id}")
client = anthropic.Anthropic()

MESSAGE_BATCH_ID = "msgbatch_01HkcTjaV5uDC8jWR4ZsDV8d"

message_batch = client.messages.batches.cancel(
    MESSAGE_BATCH_ID,
)
print(message_batch)
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request

client = anthropic.Anthropic()

message_batch = client.messages.batches.create(
    requests=[
        Request(
            custom_id="my-first-request",
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-7",
                max_tokens=1024,
                system=[
                    {
                        "type": "text",
                        "text": "You are an AI assistant tasked with analyzing literary works. Your goal is to provide insightful commentary on themes, characters, and writing style.\n",
                    },
                    {
                        "type": "text",
                        "text": "<the entire contents of Pride and Prejudice>",
                        "cache_control": {"type": "ephemeral"},
                    },
                ],
                messages=[
                    {
                        "role": "user",
                        "content": "Analyze the major themes in Pride and Prejudice.",
                    }
                ],
            ),
        ),
        Request(
            custom_id="my-second-request",
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-7",
                max_tokens=1024,
                system=[
                    {
                        "type": "text",
                        "text": "You are an AI assistant tasked with analyzing literary works. Your goal is to provide insightful commentary on themes, characters, and writing style.\n",
                    },
                    {
                        "type": "text",
                        "text": "<the entire contents of Pride and Prejudice>",
                        "cache_control": {"type": "ephemeral"},
                    },
                ],
                messages=[
                    {
                        "role": "user",
                        "content": "Write a summary of Pride and Prejudice.",
                    }
                ],
            ),
        ),
    ]
)
from anthropic.types.beta.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.beta.messages.batch_create_params import Request

client = anthropic.Anthropic()

message_batch = client.beta.messages.batches.create(
    betas=["output-300k-2026-03-24"],
    requests=[
        Request(
            custom_id="long-form-request",
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-7",
                max_tokens=300_000,
                messages=[
                    {
                        "role": "user",
                        "content": "Write a comprehensive technical guide to building distributed systems, covering architecture patterns, consistency models, fault tolerance, and operational best practices.",
                    }
                ],
            ),
        ),
    ],
)

print(message_batch)