Skip to main content

Message Batches with 300K Output: Cut Bulk Document Processing Costs by 50%

Claude's Message Batches API now supports up to 300K output tokens per request. This step-by-step guide shows you how to batch-generate reports and process long documents at 50% lower cost, with complete Python code included.

ToolsClaudeAPIMessage Batches APIEst. read10min
2026.04.22 published
Message Batches with 300K Output: Cut Bulk Document Processing Costs by 50%

Message Batches with 300K Output: Cut Bulk Document Processing Costs by 50%

If your workflow involves generating hundreds of reports a day, processing thousands of contracts, or batch-analyzing large codebases — you’ve probably hit these two pain points:

  1. Calling the API one request at a time, constantly hitting rate limits, and watching your script crash halfway through
  2. Runaway costs — real-time API pricing adds up fast, and the bill after a single batch run makes you wince

Anthropic’s Message Batches API was built for exactly this. The latest version supports up to 300,000 output tokens per request, and with batch processing’s 50% discount, it’s one of the most cost-effective ways to handle bulk long-document tasks today.

This guide is hands-on. We’ll walk through a complete, working pipeline for batch-generating project analysis reports from scratch.


First Things First: What Is the Message Batches API?

The standard API is synchronous: send a request, wait for a response, get the result, send the next one.

The Batches API is asynchronous: submit up to 10,000 requests at once, Anthropic processes them in the background, and you retrieve the results later.

Standard Messages API Message Batches API
Call pattern Synchronous, one at a time Asynchronous, bulk submit
Pricing Standard rate Standard rate × 50%
Max output per request 8,192 tokens (default) 300,000 tokens
Max requests per batch 1 10,000
Best for Real-time interaction, low-latency needs Bulk generation, offline processing
Result delivery Immediate Poll or wait for completion

Bottom line: if your task doesn’t need a real-time response, you should be using the Batches API.


Hands-On: Batch-Generating 500 Project Analysis Reports

Say you have README files from 500 open-source projects and need a structured analysis report for each — covering tech stack evaluation, use cases, potential risks, and an overall recommendation score.

Setup

pip install anthropic
pip install anthropic

Get your API key from ClaudeAPI.com.

import anthropic
import json
import time

client = anthropic.Anthropic(
    api_key="your-api-key",
    base_url="https://api.claudeapi.com"  # ClaudeAPI.com endpoint
)
import anthropic
import json
import time

client = anthropic.Anthropic(
    api_key="your-api-key",
    base_url="https://api.claudeapi.com"  # ClaudeAPI.com endpoint
)

Step 1: Build the Batch Requests

def build_batch_requests(projects: list[dict]) -> list[dict]:
    """
    projects: [{"id": "proj_001", "name": "...", "readme": "..."}]
    """
    requests = []
  
    for project in projects:
        request = {
            "custom_id": project["id"],  # Your own ID — used to match results later
            "params": {
                "model": "claude-opus-4-6",
                "max_tokens": 300000,  # Up to 300K output tokens per request
                "messages": [
                    {
                        "role": "user",
                        "content": f"""Perform a deep analysis of the following open-source project and output a structured report:

Project name: {project['name']}

README contents:
{project['readme']}

Please structure your report as follows:

## Project Overview
(One-sentence summary of core functionality)

## Tech Stack Assessment
(List major technologies; evaluate maturity and community activity)

## Core Use Cases
(3–5 best-fit use cases)

## Risks & Limitations
(Tech debt, maintenance risks, scope boundaries)

## Overall Recommendation Score
(1–10, with justification)
"""
                    }
                ]
            }
        }
        requests.append(request)
  
    return requests
def build_batch_requests(projects: list[dict]) -> list[dict]:
    """
    projects: [{"id": "proj_001", "name": "...", "readme": "..."}]
    """
    requests = []
  
    for project in projects:
        request = {
            "custom_id": project["id"],  # Your own ID — used to match results later
            "params": {
                "model": "claude-opus-4-6",
                "max_tokens": 300000,  # Up to 300K output tokens per request
                "messages": [
                    {
                        "role": "user",
                        "content": f"""Perform a deep analysis of the following open-source project and output a structured report:

Project name: {project['name']}

README contents:
{project['readme']}

Please structure your report as follows:

## Project Overview
(One-sentence summary of core functionality)

## Tech Stack Assessment
(List major technologies; evaluate maturity and community activity)

## Core Use Cases
(3–5 best-fit use cases)

## Risks & Limitations
(Tech debt, maintenance risks, scope boundaries)

## Overall Recommendation Score
(1–10, with justification)
"""
                    }
                ]
            }
        }
        requests.append(request)
  
    return requests

Step 2: Submit the Batch

def submit_batch(projects: list[dict]) -> str:
    """Submit the batch and return the batch_id."""
  
    requests = build_batch_requests(projects)
  
    print(f"Submitting {len(requests)} requests...")
  
    batch = client.messages.batches.create(requests=requests)
  
    print(f"Batch submitted!")
    print(f"  Batch ID: {batch.id}")
    print(f"  Status: {batch.processing_status}")
    print(f"  Total requests: {batch.request_counts.processing}")
  
    return batch.id
def submit_batch(projects: list[dict]) -> str:
    """Submit the batch and return the batch_id."""
  
    requests = build_batch_requests(projects)
  
    print(f"Submitting {len(requests)} requests...")
  
    batch = client.messages.batches.create(requests=requests)
  
    print(f"Batch submitted!")
    print(f"  Batch ID: {batch.id}")
    print(f"  Status: {batch.processing_status}")
    print(f"  Total requests: {batch.request_counts.processing}")
  
    return batch.id

Step 3: Poll Until Complete

def wait_for_batch(batch_id: str, poll_interval: int = 60) -> None:
    """Poll batch status until processing ends."""
  
    print(f"\nWaiting for batch to complete (checking every {poll_interval}s)...")
  
    while True:
        batch = client.messages.batches.retrieve(batch_id)
        counts = batch.request_counts
      
        print(f"[{time.strftime('%H:%M:%S')}] Status: {batch.processing_status} | "
              f"Processing: {counts.processing} | "
              f"Succeeded: {counts.succeeded} | "
              f"Errored: {counts.errored}")
      
        if batch.processing_status == "ended":
            print(f"\nBatch complete!")
            print(f"  Succeeded: {counts.succeeded}")
            print(f"  Errored: {counts.errored}")
            print(f"  Expired: {counts.expired}")
            break
      
        time.sleep(poll_interval)
def wait_for_batch(batch_id: str, poll_interval: int = 60) -> None:
    """Poll batch status until processing ends."""
  
    print(f"\nWaiting for batch to complete (checking every {poll_interval}s)...")
  
    while True:
        batch = client.messages.batches.retrieve(batch_id)
        counts = batch.request_counts
      
        print(f"[{time.strftime('%H:%M:%S')}] Status: {batch.processing_status} | "
              f"Processing: {counts.processing} | "
              f"Succeeded: {counts.succeeded} | "
              f"Errored: {counts.errored}")
      
        if batch.processing_status == "ended":
            print(f"\nBatch complete!")
            print(f"  Succeeded: {counts.succeeded}")
            print(f"  Errored: {counts.errored}")
            print(f"  Expired: {counts.expired}")
            break
      
        time.sleep(poll_interval)

Step 4: Collect and Save Results

def collect_results(batch_id: str) -> dict[str, str]:
    """Collect all successful results."""
  
    results = {}
    errors = []
  
    for result in client.messages.batches.results(batch_id):
        custom_id = result.custom_id
      
        if result.result.type == "succeeded":
            # Extract text content
            content = result.result.message.content[0].text
            results[custom_id] = content
          
        elif result.result.type == "errored":
            error_msg = result.result.error.error.message
            errors.append({"id": custom_id, "error": error_msg})
            print(f"Request failed [{custom_id}]: {error_msg}")
  
    if errors:
        with open("batch_errors.json", "w", encoding="utf-8") as f:
            json.dump(errors, f, ensure_ascii=False, indent=2)
        print(f"\n{len(errors)} requests failed — details saved to batch_errors.json")
  
    return results


def save_reports(results: dict[str, str], output_dir: str = "reports") -> None:
    """Save each report as a separate file."""
    import os
    os.makedirs(output_dir, exist_ok=True)
  
    for custom_id, content in results.items():
        filepath = os.path.join(output_dir, f"{custom_id}.md")
        with open(filepath, "w", encoding="utf-8") as f:
            f.write(content)
  
    print(f"\n{len(results)} reports saved to {output_dir}/")
def collect_results(batch_id: str) -> dict[str, str]:
    """Collect all successful results."""
  
    results = {}
    errors = []
  
    for result in client.messages.batches.results(batch_id):
        custom_id = result.custom_id
      
        if result.result.type == "succeeded":
            # Extract text content
            content = result.result.message.content[0].text
            results[custom_id] = content
          
        elif result.result.type == "errored":
            error_msg = result.result.error.error.message
            errors.append({"id": custom_id, "error": error_msg})
            print(f"Request failed [{custom_id}]: {error_msg}")
  
    if errors:
        with open("batch_errors.json", "w", encoding="utf-8") as f:
            json.dump(errors, f, ensure_ascii=False, indent=2)
        print(f"\n{len(errors)} requests failed — details saved to batch_errors.json")
  
    return results


def save_reports(results: dict[str, str], output_dir: str = "reports") -> None:
    """Save each report as a separate file."""
    import os
    os.makedirs(output_dir, exist_ok=True)
  
    for custom_id, content in results.items():
        filepath = os.path.join(output_dir, f"{custom_id}.md")
        with open(filepath, "w", encoding="utf-8") as f:
            f.write(content)
  
    print(f"\n{len(results)} reports saved to {output_dir}/")

Full Pipeline

def main():
    # Sample data — in production, load from files or a database
    projects = [
        {
            "id": f"proj_{i:03d}",
            "name": f"Sample Project {i}",
            "readme": f"This is the README for project {i}..." * 100  # Simulating long text
        }
        for i in range(1, 501)  # 500 projects
    ]
  
    # 1. Submit the batch
    batch_id = submit_batch(projects)
  
    # Persist the batch_id in case the script crashes
    with open("batch_id.txt", "w") as f:
        f.write(batch_id)
    print(f"Batch ID saved to batch_id.txt")
  
    # 2. Wait for completion
    wait_for_batch(batch_id, poll_interval=60)
  
    # 3. Collect results
    results = collect_results(batch_id)
  
    # 4. Save reports
    save_reports(results)
  
    print("\nAll done!")


if __name__ == "__main__":
    main()
def main():
    # Sample data — in production, load from files or a database
    projects = [
        {
            "id": f"proj_{i:03d}",
            "name": f"Sample Project {i}",
            "readme": f"This is the README for project {i}..." * 100  # Simulating long text
        }
        for i in range(1, 501)  # 500 projects
    ]
  
    # 1. Submit the batch
    batch_id = submit_batch(projects)
  
    # Persist the batch_id in case the script crashes
    with open("batch_id.txt", "w") as f:
        f.write(batch_id)
    print(f"Batch ID saved to batch_id.txt")
  
    # 2. Wait for completion
    wait_for_batch(batch_id, poll_interval=60)
  
    # 3. Collect results
    results = collect_results(batch_id)
  
    # 4. Save reports
    save_reports(results)
  
    print("\nAll done!")


if __name__ == "__main__":
    main()

Crash Recovery: Picking Up Where You Left Off

Batches run server-side — if your local script crashes, processing continues uninterrupted. To resume:

# Recover from the saved batch_id
with open("batch_id.txt") as f:
    batch_id = f.read().strip()

# Check status and collect results
wait_for_batch(batch_id)
results = collect_results(batch_id)
save_reports(results)
# Recover from the saved batch_id
with open("batch_id.txt") as f:
    batch_id = f.read().strip()

# Check status and collect results
wait_for_batch(batch_id)
results = collect_results(batch_id)
save_reports(results)

Cost Breakdown: How Much Do You Save?

Using the 500-report example above, assuming an average of 2,000 output tokens per report:

Approach Output price Total output tokens Total cost
Standard Messages API (claude-opus-4-6) $15 / 1M 1,000,000 $15.00
Message Batches API $7.5 / 1M 1,000,000 $7.50
Savings $7.50 (50%)

The higher your volume, the bigger the savings. At 100M tokens per month, you save $750/month.

Access via ClaudeAPI.com for additional discounts on top of batch pricing. Direct access from anywhere, no VPN required.


Practical Tips

1. Use meaningful custom_id values

Use business IDs instead of sequential numbers — makes it much easier to match results:

"custom_id": f"contract_{contract_id}_v{version}"
"custom_id": f"contract_{contract_id}_v{version}"

2. Don’t max out batch size blindly

The per-batch limit is 10,000 requests, but consider splitting by business dimension (e.g., by department or date). This makes failed retries and result management far easier.

3. Retry only the failed requests

# Re-submit only the ones that failed
failed_ids = {e["id"] for e in errors}
failed_projects = [p for p in projects if p["id"] in failed_ids]
retry_batch_id = submit_batch(failed_projects)
# Re-submit only the ones that failed
failed_ids = {e["id"] for e in errors}
failed_projects = [p for p in projects if p["id"] in failed_ids]
retry_batch_id = submit_batch(failed_projects)

4. Results are available for 29 days

You have 29 days after submission to retrieve results — no rush.


When to Use the Batches API

  • Daily scheduled report generation (finance, ops, sales)
  • Bulk contract / document review and summarization
  • Large-scale code review and comment generation
  • Dataset labeling and classification
  • Multilingual content translation at scale
  • SEO content production

When NOT to use it: real-time chat, APIs requiring millisecond latency, user-facing UIs waiting on a response.


Summary

The Message Batches API’s 300K output limit + 50% discount cuts bulk long-document processing costs in half. The core workflow is four steps: Submit → Wait → Collect → Save — under 100 lines of code.

If your workload involves batch processing, there’s no reason to still be calling the synchronous API one request at a time.


Get your API key at ClaudeAPI.com and start your first batch job today.

Related Articles