Was this page helpful?
伺服器端壓縮是管理長時間運行對話和代理工作流程中上下文的推薦策略。它能以最少的整合工作自動處理上下文管理。
壓縮透過在接近上下文視窗限制時自動摘要較舊的上下文,來延長長時間運行對話和任務的有效上下文長度。這非常適合:
壓縮目前處於測試階段。在您的 API 請求中包含 beta 標頭 compact-2026-01-12 以使用此功能。
壓縮在以下模型上受支援:
claude-opus-4-6)啟用壓縮後,Claude 會在對話接近配置的 token 閾值時自動摘要您的對話。API:
compaction 區塊。在後續請求中,將回應附加到您的訊息中。API 會自動丟棄 compaction 區塊之前的所有訊息區塊,從摘要繼續對話。
透過在 Messages API 請求中將 compact_20260112 策略添加到 context_management.edits 來啟用壓縮。
| 參數 | 類型 | 預設值 | 描述 |
|---|---|---|---|
type | string | 必填 | 必須為 "compact_20260112" |
trigger | object | 150,000 tokens | 何時觸發壓縮。必須至少為 50,000 tokens。 |
pause_after_compaction | boolean | false | 是否在生成壓縮摘要後暫停 |
instructions | string | null | 自訂摘要提示。提供時會完全取代預設提示。 |
使用 trigger 參數配置壓縮何時觸發:
預設情況下,壓縮使用以下摘要提示:
You have written a partial transcript for the initial task above. Please write a summary of the transcript. The purpose of this summary is to provide continuity so you can continue to make progress towards solving the task in a future context, where the raw history above may not be accessible and will be replaced with this summary. Write down anything that would be helpful, including the state, next steps, learnings etc. You must wrap your summary in a <summary></summary> block.您可以透過 instructions 參數提供自訂指令來完全取代此提示。自訂指令不會補充預設指令;它們會完全取代它:
使用 pause_after_compaction 在生成壓縮摘要後暫停 API。這允許您在 API 繼續回應之前添加額外的內容區塊(例如保留最近的訊息或特定的指令導向訊息)。
啟用後,API 在生成壓縮區塊後會返回帶有 compaction 停止原因的訊息:
當模型處理具有許多工具使用迭代的長任務時,總 token 消耗可能會顯著增長。您可以將 pause_after_compaction 與壓縮計數器結合使用,以估算累計使用量,並在達到預算時優雅地結束任務:
TRIGGER_THRESHOLD = 100_000
TOTAL_TOKEN_BUDGET = 3_000_000
n_compactions = 0
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": TRIGGER_THRESHOLD},
"pause_after_compaction": True,
}
]
},
)
if response.stop_reason == "compaction":
n_compactions += 1
messages.append({"role": "assistant", "content": response.content})
# Estimate total tokens consumed; prompt wrap-up if over budget
if n_compactions * TRIGGER_THRESHOLD >= TOTAL_TOKEN_BUDGET:
messages.append({
"role": "user",
"content": "Please wrap up your current work and summarize the final state.",
})當觸發壓縮時,API 會在助手回應的開頭返回一個 compaction 區塊。
長時間運行的對話可能會導致多次壓縮。最後一個壓縮區塊反映了提示的最終狀態,用生成的摘要取代其之前的內容。
{
"content": [
{
"type": "compaction",
"content": "Summary of the conversation: The user requested help building a web scraper..."
},
{
"type": "text",
"text": "Based on our conversation so far..."
}
]
}您必須在後續請求中將 compaction 區塊傳回 API,以使用縮短的提示繼續對話。最簡單的方法是將整個回應內容附加到您的訊息中:
當 API 收到 compaction 區塊時,其之前的所有內容區塊都會被忽略。您可以選擇:
當啟用壓縮的串流回應時,您會在壓縮開始時收到 content_block_start 事件。壓縮區塊的串流方式與文字區塊不同。您會收到一個 content_block_start 事件,接著是一個包含完整摘要內容的單一 content_block_delta(沒有中間串流),然後是一個 content_block_stop 事件。
您可以在壓縮區塊上添加 cache_control 斷點,這會快取完整的系統提示以及摘要內容。原始壓縮內容會被忽略。
{
"role": "assistant",
"content": [
{
"type": "compaction",
"content": "[summary text]",
"cache_control": {"type": "ephemeral"}
},
{
"type": "text",
"text": "Based on our conversation..."
}
]
}壓縮需要額外的取樣步驟,這會計入速率限制和計費。API 在回應中返回詳細的使用量資訊:
{
"usage": {
"input_tokens": 45000,
"output_tokens": 1234,
"iterations": [
{
"type": "compaction",
"input_tokens": 180000,
"output_tokens": 3500
},
{
"type": "message",
"input_tokens": 23000,
"output_tokens": 1000
}
]
}
}iterations 陣列顯示每個取樣迭代的使用量。當發生壓縮時,您會看到一個 compaction 迭代,後面跟著主要的 message 迭代。最後一個迭代的 token 計數反映了壓縮後的有效上下文大小。
頂層的 input_tokens 和 output_tokens 不包含壓縮迭代的使用量——它們反映的是所有非壓縮迭代的總和。要計算請求的總消耗和計費 token,請對 usage.iterations 陣列中的所有條目求和。
如果您之前依賴 usage.input_tokens 和 usage.output_tokens 進行成本追蹤或審計,當啟用壓縮時,您需要更新追蹤邏輯以跨 usage.iterations 進行彙總。iterations 陣列僅在請求期間觸發新壓縮時才會填充。重新套用先前的 compaction 區塊不會產生額外的壓縮成本,在這種情況下頂層使用量欄位仍然準確。
使用伺服器工具(如網頁搜尋)時,壓縮觸發會在每個取樣迭代開始時檢查。根據您的觸發閾值和生成的輸出量,壓縮可能在單一請求中發生多次。
Token 計數端點(/v1/messages/count_tokens)會套用提示中現有的 compaction 區塊,但不會觸發新的壓縮。使用它來檢查先前壓縮後的有效 token 計數:
以下是使用壓縮的長時間運行對話的完整範例:
以下是使用 pause_after_compaction 來保留最後兩條訊息(一個使用者 + 一個助手回合)原文而非摘要的範例:
curl https://api.anthropic.com/v1/messages \
--header "x-api-key: $ANTHROPIC_API_KEY" \
--header "anthropic-version: 2023-06-01" \
--header "anthropic-beta: compact-2026-01-12" \
--header "content-type: application/json" \
--data \
'{
"model": "claude-opus-4-6",
"max_tokens": 4096,
"messages": [
{
"role": "user",
"content": "Help me build a website"
}
],
"context_management": {
"edits": [
{
"type": "compact_20260112"
}
]
}
}'response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {
"type": "input_tokens",
"value": 150000
}
}
]
}
)response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"instructions": "Focus on preserving code snippets, variable names, and technical decisions."
}
]
}
)response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"pause_after_compaction": True
}
]
}
)
# Check if compaction triggered a pause
if response.stop_reason == "compaction":
# Response contains only the compaction block
messages.append({"role": "assistant", "content": response.content})
# Continue the request
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [{"type": "compact_20260112"}]
}
)# After receiving a response with a compaction block
messages.append({"role": "assistant", "content": response.content})
# Continue the conversation
messages.append({"role": "user", "content": "Now add error handling"})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [{"type": "compact_20260112"}]
}
)import anthropic
client = anthropic.Anthropic()
with client.beta.messages.stream(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [{"type": "compact_20260112"}]
}
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "compaction":
print("Compaction started...")
elif event.content_block.type == "text":
print("Text response started...")
elif event.type == "content_block_delta":
if event.delta.type == "compaction_delta":
print(f"Compaction complete: {len(event.delta.content)} chars")
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
# Get the final accumulated message
message = stream.get_final_message()
messages.append({"role": "assistant", "content": message.content})count_response = client.beta.messages.count_tokens(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
messages=messages,
context_management={
"edits": [{"type": "compact_20260112"}]
}
)
print(f"Current tokens: {count_response.input_tokens}")
print(f"Original tokens: {count_response.context_management.original_input_tokens}")import anthropic
client = anthropic.Anthropic()
messages: list[dict] = []
def chat(user_message: str) -> str:
messages.append({"role": "user", "content": user_message})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": 100000}
}
]
}
)
# Append response (compaction blocks are automatically included)
messages.append({"role": "assistant", "content": response.content})
# Return the text content
return next(
block.text for block in response.content if block.type == "text"
)
# Run a long conversation
print(chat("Help me build a Python web scraper"))
print(chat("Add support for JavaScript-rendered pages"))
print(chat("Now add rate limiting and error handling"))
# ... continue as long as neededimport anthropic
from typing import Any
client = anthropic.Anthropic()
messages: list[dict[str, Any]] = []
def chat(user_message: str) -> str:
messages.append({"role": "user", "content": user_message})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages,
context_management={
"edits": [
{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": 100000},
"pause_after_compaction": True
}
]
}
)
# Check if compaction occurred and paused
if response.stop_reason == "compaction":
# Get the compaction block from the response
compaction_block = response.content[0]
# Preserve the last 2 messages (1 user + 1 assistant turn)
# by including them after the compaction block
preserved_messages = messages[-2:] if len(messages) >= 2 else messages
# Build new message list: compaction + preserved messages
new_assistant_content = [compaction_block]
messages_after_compaction = [
{"role": "assistant", "content": new_assistant_content}
] + preserved_messages
# Continue the request with the compacted context + preserved messages
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
messages=messages_after_compaction,
context_management={
"edits": [{"type": "compact_20260112"}]
}
)
# Update our message list to reflect the compaction
messages.clear()
messages.extend(messages_after_compaction)
# Append the final response
messages.append({"role": "assistant", "content": response.content})
# Return the text content
return next(
block.text for block in response.content if block.type == "text"
)
# Run a long conversation
print(chat("Help me build a Python web scraper"))
print(chat("Add support for JavaScript-rendered pages"))
print(chat("Now add rate limiting and error handling"))
# ... continue as long as needed在 cookbook 中探索實用範例和實作。