Usage & Cost Admin API Cookbook
Usage & Cost Admin API Cookbook
A practical guide to programmatically accessing your Claude API usage and cost data
What You Can Do
What You Can Do
Usage Tracking:
- Monitor token consumption (uncached input, output, cache creation/reads)
- Track usage across models, workspaces, and API keys
- Analyze cache efficiency and server tool usage
Cost Analysis:
- Retrieve detailed cost breakdowns by service type
- Monitor spending trends across workspaces
- Generate reports for finance and chargeback scenarios
Common Use Cases:
- Usage Monitoring: Track consumption patterns and optimize costs
- Cost Attribution: Allocate expenses across teams/projects by workspace
- Cache Analysis: Measure and improve cache efficiency
- Financial Reporting: Generate executive summaries and budget reports
API Overview
API Overview
Two main endpoints:
- Messages Usage API: Token-level usage data with flexible grouping
- Cost API: Financial data in USD with service breakdowns
Prerequisites & Security
Prerequisites & Security
- Admin API Key: Get from Claude Console (format:
sk-ant-admin...) - Security: Store keys in environment variables, rotate regularly, never commit to version control
python
import os
from datetime import datetime, time, timedelta
from typing import Any
import requests
class AnthropicAdminAPI:
"""Secure wrapper for Anthropic Admin API endpoints."""
def __init__(self, api_key: str | None = None):
self.api_key = api_key or os.getenv("ANTHROPIC_ADMIN_API_KEY")
if not self.api_key:
raise ValueError(
"Admin API key required. Set ANTHROPIC_ADMIN_API_KEY environment variable."
)
if not self.api_key.startswith("sk-ant-admin"):
raise ValueError("Invalid Admin API key format.")
self.base_url = "https://api.anthropic.com/v1/organizations"
self.headers = {
"anthropic-version": "2023-06-01",
"x-api-key": self.api_key,
"Content-Type": "application/json",
}
def _make_request(self, endpoint: str, params: dict[str, Any]) -> dict[str, Any]:
"""Make authenticated request with basic error handling."""
url = f"{self.base_url}/{endpoint}"
try:
response = requests.get(url, headers=self.headers, params=params, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise ValueError("Invalid API key or insufficient permissions") from e
elif response.status_code == 429:
raise requests.exceptions.RequestException(
"Rate limit exceeded - try again later"
) from e
else:
raise requests.exceptions.RequestException(f"API error: {e}") from e
# Test connection
def test_connection():
try:
client = AnthropicAdminAPI()
# Simple test query - snap to start of day to align with bucket boundaries
params = {
"starting_at": (
datetime.combine(datetime.utcnow(), time.min) - timedelta(days=1)
).strftime("%Y-%m-%dT%H:%M:%SZ"),
"ending_at": datetime.combine(datetime.utcnow(), time.min).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
"bucket_width": "1d",
"limit": 1,
}
client._make_request("usage_report/messages", params)
print("✅ Connection successful!")
return client
except Exception as e:
print(f"❌ Connection failed: {e}")
return None
client = test_connection()Basic Usage & Cost Tracking
Basic Usage & Cost Tracking
Understanding Usage Data
Understanding Usage Data
The Messages Usage API provides token consumption in time buckets - fixed intervals containing aggregated usage.
Key Metrics:
- uncached_input_tokens: New input tokens (prompts, system messages)
- output_tokens: Claude's responses
- cache_creation: Tokens cached for reuse
- cache_read_input_tokens: Previously cached tokens reused
Basic Usage Query
Basic Usage Query
python
def get_daily_usage(client, days_back=7):
"""Get usage data for the last N days."""
end_time = datetime.combine(datetime.utcnow(), time.min)
start_time = end_time - timedelta(days=days_back)
params = {
"starting_at": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ending_at": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"bucket_width": "1d",
"limit": days_back,
}
return client._make_request("usage_report/messages", params)
def analyze_usage_data(response):
"""Process and display usage data."""
if not response or not response.get("data"):
print("No usage data found.")
return
total_uncached_input = total_output = total_cache_creation = 0
total_cache_reads = total_web_searches = 0
daily_data = []
for bucket in response["data"]:
date = bucket["starting_at"][:10]
# Sum all results in bucket
bucket_uncached = bucket_output = bucket_cache_creation = 0
bucket_cache_reads = bucket_web_searches = 0
for result in bucket["results"]:
bucket_uncached += result.get("uncached_input_tokens", 0)
bucket_output += result.get("output_tokens", 0)
cache_creation = result.get("cache_creation", {})
bucket_cache_creation += cache_creation.get(
"ephemeral_1h_input_tokens", 0
) + cache_creation.get("ephemeral_5m_input_tokens", 0)
bucket_cache_reads += result.get("cache_read_input_tokens", 0)
server_tools = result.get("server_tool_use", {})
bucket_web_searches += server_tools.get("web_search_requests", 0)
daily_data.append(
{
"date": date,
"uncached_input_tokens": bucket_uncached,
"output_tokens": bucket_output,
"cache_creation": bucket_cache_creation,
"cache_reads": bucket_cache_reads,
"web_searches": bucket_web_searches,
"total_tokens": bucket_uncached + bucket_output,
}
)
# Add to totals
total_uncached_input += bucket_uncached
total_output += bucket_output
total_cache_creation += bucket_cache_creation
total_cache_reads += bucket_cache_reads
total_web_searches += bucket_web_searches
# Calculate cache efficiency
total_input_tokens = total_uncached_input + total_cache_creation + total_cache_reads
cache_efficiency = (
(total_cache_reads / total_input_tokens * 100) if total_input_tokens > 0 else 0
)
# Display summary
print("📊 Usage Summary:")
print(f"Uncached input tokens: {total_uncached_input:,}")
print(f"Output tokens: {total_output:,}")
print(f"Cache creation: {total_cache_creation:,}")
print(f"Cache reads: {total_cache_reads:,}")
print(f"Cache efficiency: {cache_efficiency:.1f}%")
print(f"Web searches: {total_web_searches:,}")
return daily_data
# Example usage
if client:
usage_response = get_daily_usage(client, days_back=7)
daily_usage = analyze_usage_data(usage_response)📊 Usage Summary: Uncached input tokens: 267,751 Output tokens: 2,848,746 Cache creation: 0 Cache reads: 0 Cache efficiency: 0.0% Web searches: 0
Basic Cost Tracking
Basic Cost Tracking
Note: Priority Tier costs use a different billing model and will never appear in the cost endpoint. You can track Priority Tier usage in the usage endpoint, but not costs.
python
def get_daily_costs(client, days_back=7):
"""Get cost data for the last N days."""
end_time = datetime.combine(datetime.utcnow(), time.min)
start_time = end_time - timedelta(days=days_back)
params = {
"starting_at": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ending_at": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"bucket_width": "1d", # Only 1d supported for cost API
"limit": min(days_back, 31), # Max 31 days per request
}
return client._make_request("cost_report", params)
def analyze_cost_data(response):
"""Process and display cost data."""
if not response or not response.get("data"):
print("No cost data found.")
return
total_cost_minor_units = 0
daily_costs = []
for bucket in response["data"]:
date = bucket["starting_at"][:10]
# Sum all costs in this bucket
bucket_cost = 0
for result in bucket["results"]:
# Convert string amounts to float if needed
amount = result.get("amount", 0)
if isinstance(amount, str):
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
bucket_cost += amount
daily_costs.append(
{
"date": date,
"cost_minor_units": bucket_cost,
"cost_usd": bucket_cost / 100, # Convert to dollars
}
)
total_cost_minor_units += bucket_cost
total_cost_usd = total_cost_minor_units / 100
print("💰 Cost Summary:")
print(f"Total cost: ${total_cost_usd:.4f}")
print(f"Average daily cost: ${total_cost_usd / len(daily_costs):.4f}")
return daily_costs
# Example usage
if client:
cost_response = get_daily_costs(client, days_back=7)
daily_costs = analyze_cost_data(cost_response)💰 Cost Summary: Total cost: $83.7574 Average daily cost: $11.9653
Grouping, Filtering & Pagination
Grouping, Filtering & Pagination
Time Granularity Options
Time Granularity Options
Usage API supports three granularities:
1m(1 minute): High-resolution analysis, max 1440 buckets per request1h(1 hour): Medium-resolution, max 168 buckets per request1d(1 day): Daily analysis, max 31 buckets per request
Cost API supports:
1d(1 day): Only option available, max 31 buckets per request
Grouping and Filtering
Grouping and Filtering
python
def get_usage_by_model(client, days_back=7):
"""Get usage data grouped by model, handling pagination automatically."""
end_time = datetime.combine(datetime.utcnow(), time.min)
start_time = end_time - timedelta(days=days_back)
params = {
"starting_at": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ending_at": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"group_by[]": ["model"],
"bucket_width": "1d",
}
# Aggregate across all pages of data
model_usage = {}
page_count = 0
max_pages = 10 # Reasonable limit to avoid infinite loops
try:
next_page = None
while page_count < max_pages:
current_params = params.copy()
if next_page:
current_params["page"] = next_page
response = client._make_request("usage_report/messages", current_params)
page_count += 1
# Process this page's data
for bucket in response.get("data", []):
for result in bucket.get("results", []):
model = result.get("model", "Unknown")
uncached = result.get("uncached_input_tokens", 0)
output = result.get("output_tokens", 0)
cache_creation = result.get("cache_creation", {})
cache_creation_tokens = cache_creation.get(
"ephemeral_1h_input_tokens", 0
) + cache_creation.get("ephemeral_5m_input_tokens", 0)
cache_reads = result.get("cache_read_input_tokens", 0)
tokens = uncached + output + cache_creation_tokens + cache_reads
if model not in model_usage:
model_usage[model] = 0
model_usage[model] += tokens
# Check if there's more data
if not response.get("has_more", False):
break
next_page = response.get("next_page")
if not next_page:
break
except Exception as e:
print(f"❌ Error retrieving usage data: {e}")
return {}
# Display results
print("📊 Usage by Model:")
if not model_usage:
print(f" No usage data found in the last {days_back} days")
print(" 💡 Try increasing the time range or check if you have recent API usage")
else:
for model, tokens in sorted(model_usage.items(), key=lambda x: x[1], reverse=True):
print(f" {model}: {tokens:,} tokens")
return model_usage
def filter_usage_example(client):
"""Example of filtering usage data."""
params = {
"starting_at": (datetime.combine(datetime.utcnow(), time.min) - timedelta(days=7)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
"ending_at": datetime.combine(datetime.utcnow(), time.min).strftime("%Y-%m-%dT%H:%M:%SZ"),
"models[]": ["claude-sonnet-4-5"], # Filter to specific model
"service_tiers[]": ["standard"], # Filter to standard tier
"bucket_width": "1d",
}
response = client._make_request("usage_report/messages", params)
print(f"Found {len(response.get('data', []))} days of filtered usage data")
return response
# Example usage
if client:
model_usage = get_usage_by_model(client, days_back=14)
filtered_usage = filter_usage_example(client)📊 Usage by Model: claude-3-5-haiku-20241022: 995,781 tokens claude-sonnet-4-5: 861,880 tokens claude-opus-4-1: 394,646 tokens claude-sonnet-4-5: 356,766 tokens claude-opus-4-20250514: 308,223 tokens claude-opus-4-1: 199,201 tokens Found 7 days of filtered usage data
Pagination for Large Datasets
Pagination for Large Datasets
python
def fetch_all_usage_data(client, params, max_pages=10):
"""Fetch all paginated usage data."""
all_data = []
page_count = 0
next_page = None
print("📥 Fetching paginated data...")
while page_count < max_pages:
current_params = params.copy()
if next_page:
current_params["page"] = next_page
try:
response = client._make_request("usage_report/messages", current_params)
if not response or not response.get("data"):
break
page_data = response["data"]
all_data.extend(page_data)
page_count += 1
print(f" Page {page_count}: {len(page_data)} time buckets")
if not response.get("has_more", False):
print(f"✅ Complete: Retrieved all data in {page_count} pages")
break
next_page = response.get("next_page")
if not next_page:
break
except Exception as e:
print(f"❌ Error on page {page_count + 1}: {e}")
break
print(f"📊 Total retrieved: {len(all_data)} time buckets")
return all_data
def large_dataset_example(client, days_back=3):
"""Example of handling a large dataset with pagination."""
# Use recent time range to ensure we have data
start_time = datetime.combine(datetime.utcnow(), time.min) - timedelta(days=days_back)
end_time = datetime.combine(datetime.utcnow(), time.min)
params = {
"starting_at": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ending_at": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"bucket_width": "1h", # Hourly data for more buckets
"group_by[]": ["model"],
"limit": 24, # One day per page
}
all_buckets = fetch_all_usage_data(client, params, max_pages=5)
# Process the large dataset
if all_buckets:
total_tokens = sum(
sum(
result.get("uncached_input_tokens", 0) + result.get("output_tokens", 0)
for result in bucket["results"]
)
for bucket in all_buckets
)
print(f"📈 Total tokens across all data: {total_tokens:,}")
return all_buckets
# Example usage - use shorter time range to find recent data
if client:
large_dataset = large_dataset_example(client, days_back=3)📥 Fetching paginated data... Page 1: 24 time buckets Page 2: 24 time buckets Page 3: 24 time buckets ✅ Complete: Retrieved all data in 3 pages 📊 Total retrieved: 72 time buckets 📈 Total tokens across all data: 1,336,287
Simple Data Export
Simple Data Export
CSV Export for External Analysis
CSV Export for External Analysis
python
import csv
def export_usage_to_csv(client, output_file="usage_data.csv", days_back=30):
"""Export usage data to CSV for external analysis."""
end_time = datetime.combine(datetime.utcnow(), time.min)
start_time = end_time - timedelta(days=days_back)
params = {
"starting_at": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ending_at": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"group_by[]": ["model", "service_tier", "workspace_id"],
"bucket_width": "1d",
}
try:
# Collect all data across pages
rows = []
page_count = 0
max_pages = 20 # Allow more pages for export
next_page = None
while page_count < max_pages:
current_params = params.copy()
if next_page:
current_params["page"] = next_page
response = client._make_request("usage_report/messages", current_params)
page_count += 1
# Process this page's data
for bucket in response.get("data", []):
date = bucket["starting_at"][:10]
for result in bucket["results"]:
rows.append(
{
"date": date,
"model": result.get("model", ""),
"service_tier": result.get("service_tier", ""),
"workspace_id": result.get("workspace_id", ""),
"uncached_input_tokens": result.get("uncached_input_tokens", 0),
"output_tokens": result.get("output_tokens", 0),
"cache_creation_tokens": (
result.get("cache_creation", {}).get("ephemeral_1h_input_tokens", 0)
+ result.get("cache_creation", {}).get(
"ephemeral_5m_input_tokens", 0
)
),
"cache_read_tokens": result.get("cache_read_input_tokens", 0),
"web_search_requests": result.get("server_tool_use", {}).get(
"web_search_requests", 0
),
}
)
# Check if there's more data
if not response.get("has_more", False):
break
next_page = response.get("next_page")
if not next_page:
break
# Write CSV
if rows:
with open(output_file, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
print(f"✅ Exported {len(rows)} rows to {output_file}")
else:
print(f"No usage data to export for the last {days_back} days")
print("💡 Try increasing days_back or check if you have recent API usage")
except Exception as e:
print(f"❌ Export failed: {e}")
def export_costs_to_csv(client, output_file="cost_data.csv", days_back=30):
"""Export cost data to CSV."""
end_time = datetime.combine(datetime.utcnow(), time.min)
start_time = end_time - timedelta(days=days_back)
params = {
"starting_at": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ending_at": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"group_by[]": ["workspace_id", "description"],
}
try:
# Collect all data across pages
rows = []
page_count = 0
max_pages = 20
next_page = None
while page_count < max_pages:
current_params = params.copy()
if next_page:
current_params["page"] = next_page
response = client._make_request("cost_report", current_params)
page_count += 1
# Process this page's data
for bucket in response.get("data", []):
date = bucket["starting_at"][:10]
for result in bucket["results"]:
# Handle both string and numeric amounts
amount = result.get("amount", 0)
if isinstance(amount, str):
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
rows.append(
{
"date": date,
"workspace_id": result.get(
"workspace_id", ""
), # null for default workspace
"description": result.get("description", ""),
"currency": result.get("currency", "USD"),
"amount_usd": amount / 100,
}
)
# Check if there's more data
if not response.get("has_more", False):
break
next_page = response.get("next_page")
if not next_page:
break
if rows:
with open(output_file, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
print(f"✅ Exported {len(rows)} cost records to {output_file}")
else:
print(f"No cost data to export for the last {days_back} days")
print("💡 Try increasing days_back or check if you have recent API usage")
except Exception as e:
print(f"❌ Cost export failed: {e}")
# Example usage
if client:
export_usage_to_csv(client, "my_usage_data.csv", days_back=14)
export_costs_to_csv(client, "my_cost_data.csv", days_back=14)✅ Exported 36 rows to my_usage_data.csv ✅ Exported 72 cost records to my_cost_data.csv
Wrapping Up
Wrapping Up
This cookbook covers the essential patterns for working with the Usage & Cost Admin API:
- Basic queries for usage and cost data
- Grouping and filtering for detailed analysis
- Pagination for large datasets
- Cost description parsing for categorization
- Common gotchas to avoid issues
- Simple CSV export for external tools
Next Steps
Next Steps
- Check the official API documentation for the latest field definitions
- Test your integration with small date ranges first
- Consider data retention needs for your use case
- Monitor for new API features that may enhance your analysis
Important Notes
Important Notes
- Field names and available options may evolve as the API matures
- Always handle unknown values gracefully in production code
- The API is designed for historical analysis, not real-time monitoring
- Priority Tier costs use a different billing model and don't appear in cost endpoints
Happy analyzing! 📊
