コンテンツにスキップ

ストリーミング・バッチ処理

リアルタイム応答のストリーミングと、大量リクエストのバッチ処理を学びます。


ストリーミング

通常のAPI呼び出しは全文生成を待ってからレスポンスが返る。ストリーミングではトークンが生成されるたびにリアルタイムで受信できる。

基本的な使い方

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Pythonのasync/awaitを説明して"}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

イベント型でのハンドリング

より細かい制御が必要な場合。

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        match event.type:
            case "text":
                print(event.text, end="")
            case "content_block_stop":
                print()  # ブロック終了
            case "message_stop":
                pass  # メッセージ終了

# ストリーム終了後に完全なメッセージを取得
final_message = stream.get_final_message()
print(f"\n使用トークン: {final_message.usage}")

Tool Useとストリーミング

ツール呼び出しもストリーミングで受信可能。

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "東京の天気を教えて"}]
) as stream:
    for event in stream:
        match event.type:
            case "input_json":
                # ツール入力のJSONが部分的に届く
                print(event.partial_json, end="")
            case "content_block_stop":
                # ツール呼び出しの完全なデータを取得
                pass

Server-Sent Events(SSE)

SDKを使わずにHTTP直接の場合、SSE形式でストリーミングデータが届く。

curl https://api.anthropic.com/v1/messages \
  -H "x-api-key: $ANTHROPIC_API_KEY" \
  -H "anthropic-version: 2023-06-01" \
  -H "content-type: application/json" \
  -d '{
    "model": "claude-sonnet-4-20250514",
    "max_tokens": 1024,
    "stream": true,
    "messages": [{"role": "user", "content": "Hello"}]
  }'

SSEイベントの種類

イベント 説明
message_start メッセージ開始(モデル情報等)
content_block_start コンテンツブロック開始
content_block_delta テキストの差分(部分テキスト)
content_block_stop コンテンツブロック終了
message_delta stop_reason、usage情報
message_stop メッセージ完了

Message Batches API

大量のリクエストを非同期でまとめて処理する。個別に送るより50%割引の料金になる。

バッチの作成

import anthropic

client = anthropic.Anthropic()

# リクエストを配列で作成
requests = []
for i, text in enumerate(texts_to_summarize):
    requests.append({
        "custom_id": f"summary-{i}",
        "params": {
            "model": "claude-sonnet-4-20250514",
            "max_tokens": 512,
            "messages": [
                {"role": "user", "content": f"以下を1文で要約:\n{text}"}
            ]
        }
    })

# バッチ送信
batch = client.messages.batches.create(requests=requests)
print(f"バッチID: {batch.id}")
print(f"ステータス: {batch.processing_status}")

結果の取得

# ステータス確認
batch = client.messages.batches.retrieve(batch.id)
print(f"ステータス: {batch.processing_status}")
# => "ended" になったら結果取得可能

# 結果を取得
for result in client.messages.batches.results(batch.id):
    print(f"ID: {result.custom_id}")
    if result.result.type == "succeeded":
        print(f"結果: {result.result.message.content[0].text}")
    else:
        print(f"エラー: {result.result.error}")

バッチの特徴

項目 詳細
料金 通常の50%割引
処理時間 最大24時間以内
最大リクエスト数 100,000件/バッチ
用途 大量翻訳、分類、データ処理

使い分けの指針

ユースケース 方式 理由
チャットUI ストリーミング ユーザー体験(文字が流れる)
APIサーバー 通常(非ストリーミング) シンプルな実装
1000件のレビュー分析 バッチ コスト50%削減
リアルタイム翻訳 ストリーミング 低遅延
日次データ処理 バッチ 大量処理+コスト最適化

参考リンク