Was this page helpful?
服务端压缩是管理长时间运行的对话和智能体工作流中上下文的推荐策略。它以最少的集成工作自动处理上下文管理。
压缩通过在接近上下文窗口限制时自动总结较早的上下文,来扩展长时间运行的对话和任务的有效上下文长度。这非常适用于:
压缩目前处于 beta 阶段。在您的 API 请求中包含 beta 头 compact-2026-01-12 以使用此功能。
This feature is in beta and is not eligible for Zero Data Retention (ZDR). Beta features are excluded from ZDR.
压缩在以下模型上受支持:
claude-opus-4-6)启用压缩后,当对话接近配置的令牌阈值时,Claude 会自动总结您的对话。API:
compaction 块。在后续请求中,将响应追加到您的消息中。API 会自动丢弃 compaction 块之前的所有消息块,从摘要继续对话。
通过在 Messages API 请求中将 compact_20260112 策略添加到 context_management.edits 来启用压缩。
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
type | string | 必填 | 必须为 "compact_20260112" |
trigger | object | 150,000 令牌 | 何时触发压缩。必须至少为 50,000 令牌。 |
pause_after_compaction | boolean | false | 是否在生成压缩摘要后暂停 |
instructions | string | null | 自定义摘要提示。提供时完全替换默认提示。 |
使用 trigger 参数配置压缩触发时机:
默认情况下,压缩使用以下摘要提示:
You have written a partial transcript for the initial task above. Please write a summary of the transcript. The purpose of this summary is to provide continuity so you can continue to make progress towards solving the task in a future context, where the raw history above may not be accessible and will be replaced with this summary. Write down anything that would be helpful, including the state, next steps, learnings etc. You must wrap your summary in a <summary></summary> block.您可以通过 instructions 参数提供自定义指令来完全替换此提示。自定义指令不是补充默认指令;它们完全替换默认指令:
使用 pause_after_compaction 在生成压缩摘要后暂停 API。这允许您在 API 继续响应之前添加额外的内容块(例如保留最近的消息或特定的指令导向消息)。
启用后,API 在生成压缩块后返回带有 compaction 停止原因的消息:
当模型处理具有多次工具使用迭代的长任务时,总令牌消耗可能会显著增长。您可以将 pause_after_compaction 与压缩计数器结合使用,以估算累计使用量,并在达到预算时优雅地结束任务:
TRIGGER_THRESHOLD = 100_000
TOTAL_TOKEN_BUDGET = 3_000_000
n_compactions = 0
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": TRIGGER_THRESHOLD},
"pause_after_compaction": True,
}
]
},
)
if response.stop_reason == "compaction":
n_compactions += 1
messages.append({"role": "assistant", "content": response.content})
# Estimate total tokens consumed; prompt wrap-up if over budget
if n_compactions * TRIGGER_THRESHOLD >= TOTAL_TOKEN_BUDGET:
messages.append(
{
"role": "user",
"content": "Please wrap up your current work and summarize the final state.",
}
)当触发压缩时,API 在助手响应的开头返回一个 compaction 块。
长时间运行的对话可能会导致多次压缩。最后一个压缩块反映了提示的最终状态,用生成的摘要替换其之前的内容。
{
"content": [
{
"type": "compaction",
"content": "Summary of the conversation: The user requested help building a web scraper..."
},
{
"type": "text",
"text": "Based on our conversation so far..."
}
]
}您必须在后续请求中将 compaction 块传回 API,以使用缩短的提示继续对话。最简单的方法是将整个响应内容追加到您的消息中:
当 API 接收到 compaction 块时,其之前的所有内容块都会被忽略。您可以选择:
在启用压缩的情况下流式传输响应时,当压缩开始时您将收到一个 content_block_start 事件。压缩块的流式传输方式与文本块不同。您将收到一个 content_block_start 事件,然后是一个包含完整摘要内容的单个 content_block_delta(没有中间流式传输),最后是一个 content_block_stop 事件。
压缩与提示缓存配合良好。您可以在压缩块上添加 cache_control 断点来缓存摘要内容。原始已压缩的内容会被忽略。
{
"role": "assistant",
"content": [
{
"type": "compaction",
"content": "[summary text]",
"cache_control": {"type": "ephemeral"}
},
{
"type": "text",
"text": "Based on our conversation..."
}
]
}当压缩发生时,摘要成为需要写入缓存的新内容。如果没有额外的缓存断点,这也会使任何已缓存的系统提示失效,需要将其与压缩摘要一起重新缓存。
为了最大化缓存命中率,在系统提示的末尾添加一个 cache_control 断点。这使系统提示与对话分开缓存,因此当压缩发生时:
这种方法对于长系统提示特别有益,因为即使在对话过程中发生多次压缩事件,它们仍然保持缓存状态。
压缩需要额外的采样步骤,这会影响速率限制和计费。API 在响应中返回详细的用量信息:
{
"usage": {
"input_tokens": 45000,
"output_tokens": 1234,
"iterations": [
{
"type": "compaction",
"input_tokens": 180000,
"output_tokens": 3500
},
{
"type": "message",
"input_tokens": 23000,
"output_tokens": 1000
}
]
}
}iterations 数组显示每个采样迭代的用量。当压缩发生时,您将看到一个 compaction 迭代,后跟主要的 message 迭代。最后一个迭代的令牌计数反映了压缩后的有效上下文大小。
顶层的 input_tokens 和 output_tokens 不包括压缩迭代的用量——它们反映的是所有非压缩迭代的总和。要计算请求消耗和计费的总令牌数,请对 usage.iterations 数组中的所有条目求和。
如果您之前依赖 usage.input_tokens 和 usage.output_tokens 进行成本跟踪或审计,当启用压缩时,您需要更新跟踪逻辑以聚合 usage.iterations 中的数据。iterations 数组仅在请求期间触发新压缩时才会填充。重新应用之前的 compaction 块不会产生额外的压缩成本,在这种情况下顶层用量字段仍然准确。
使用服务器工具(如网络搜索)时,压缩触发器在每个采样迭代开始时检查。根据您的触发阈值和生成的输出量,压缩可能在单个请求中发生多次。
令牌计数端点(/v1/messages/count_tokens)会应用提示中现有的 compaction 块,但不会触发新的压缩。使用它来检查之前压缩后的有效令牌计数:
以下是一个使用压缩的长时间运行对话的完整示例:
以下是一个使用 pause_after_compaction 来保留最后两条消息(一个用户 + 一个助手轮次)原文而不是对其进行摘要的示例:
curl https://api.anthropic.com/v1/messages \
--header "x-api-key: $ANTHROPIC_API_KEY" \
--header "anthropic-version: 2023-06-01" \
--header "anthropic-beta: compact-2026-01-12" \
--header "content-type: application/json" \
--data \
'{
"model": "claude-opus-4-6",
"max_tokens": 4096,
"messages": [
{
"role": "user",
"content": "Help me build a website"
}
],
"context_management": {
"edits": [
{
"type": "compact_20260112"
}
]
}
}'response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": 150000},
}
]
},
)response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"instructions": "Focus on preserving code snippets, variable names, and technical decisions.",
}
]
},
)response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [{"type": "compact_20260112", "pause_after_compaction": True}]
},
)
# Check if compaction triggered a pause
if response.stop_reason == "compaction":
# Response contains only the compaction block
messages.append({"role": "assistant", "content": response.content})
# Continue the request
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={"edits": [{"type": "compact_20260112"}]},
)# After receiving a response with a compaction block
messages.append({"role": "assistant", "content": response.content})
# Continue the conversation
messages.append({"role": "user", "content": "Now add error handling"})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={"edits": [{"type": "compact_20260112"}]},
)import anthropic
client = anthropic.Anthropic()
with client.beta.messages.stream(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={"edits": [{"type": "compact_20260112"}]},
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "compaction":
print("Compaction started...")
elif event.content_block.type == "text":
print("Text response started...")
elif event.type == "content_block_delta":
if event.delta.type == "compaction_delta":
print(f"Compaction complete: {len(event.delta.content)} chars")
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
# Get the final accumulated message
message = stream.get_final_message()
messages.append({"role": "assistant", "content": message.content})response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
system=[
{
"type": "text",
"text": "You are a helpful coding assistant...",
"cache_control": {
"type": "ephemeral"
}, # Cache the system prompt separately
}
],
messages=messages,
context_management={"edits": [{"type": "compact_20260112"}]},
)count_response = client.beta.messages.count_tokens(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
messages=messages,
context_management={"edits": [{"type": "compact_20260112"}]},
)
print(f"Current tokens: {count_response.input_tokens}")
print(f"Original tokens: {count_response.context_management.original_input_tokens}")import anthropic
client = anthropic.Anthropic()
messages: list[dict] = []
def chat(user_message: str) -> str:
messages.append({"role": "user", "content": user_message})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": 100000},
}
]
},
)
# Append response (compaction blocks are automatically included)
messages.append({"role": "assistant", "content": response.content})
# Return the text content
return next(block.text for block in response.content if block.type == "text")
# Run a long conversation
print(chat("Help me build a Python web scraper"))
print(chat("Add support for JavaScript-rendered pages"))
print(chat("Now add rate limiting and error handling"))
# ... continue as long as neededimport anthropic
from typing import Any
client = anthropic.Anthropic()
messages: list[dict[str, Any]] = []
def chat(user_message: str) -> str:
messages.append({"role": "user", "content": user_message})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": 100000},
"pause_after_compaction": True,
}
]
},
)
# Check if compaction occurred and paused
if response.stop_reason == "compaction":
# Get the compaction block from the response
compaction_block = response.content[0]
# Preserve the last 2 messages (1 user + 1 assistant turn)
# by including them after the compaction block
preserved_messages = messages[-2:] if len(messages) >= 2 else messages
# Build new message list: compaction + preserved messages
new_assistant_content = [compaction_block]
messages_after_compaction = [
{"role": "assistant", "content": new_assistant_content}
] + preserved_messages
# Continue the request with the compacted context + preserved messages
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages_after_compaction,
context_management={"edits": [{"type": "compact_20260112"}]},
)
# Update our message list to reflect the compaction
messages.clear()
messages.extend(messages_after_compaction)
# Append the final response
messages.append({"role": "assistant", "content": response.content})
# Return the text content
return next(block.text for block in response.content if block.type == "text")
# Run a long conversation
print(chat("Help me build a Python web scraper"))
print(chat("Add support for JavaScript-rendered pages"))
print(chat("Now add rate limiting and error handling"))
# ... continue as long as needed探索管理对话上下文的其他策略,如工具结果清除和思考块清除。