ストリーミング・バッチ処理
リアルタイム応答のストリーミングと、大量リクエストのバッチ処理を学びます。
ストリーミング
通常のAPI呼び出しは全文生成を待ってからレスポンスが返る。ストリーミングではトークンが生成されるたびにリアルタイムで受信できる。
基本的な使い方
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Pythonのasync/awaitを説明して"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
イベント型でのハンドリング
より細かい制御が必要な場合。
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
match event.type:
case "text":
print(event.text, end="")
case "content_block_stop":
print() # ブロック終了
case "message_stop":
pass # メッセージ終了
# ストリーム終了後に完全なメッセージを取得
final_message = stream.get_final_message()
print(f"\n使用トークン: {final_message.usage}")
Tool Useとストリーミング
ツール呼び出しもストリーミングで受信可能。
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "東京の天気を教えて"}]
) as stream:
for event in stream:
match event.type:
case "input_json":
# ツール入力のJSONが部分的に届く
print(event.partial_json, end="")
case "content_block_stop":
# ツール呼び出しの完全なデータを取得
pass
Server-Sent Events(SSE)
SDKを使わずにHTTP直接の場合、SSE形式でストリーミングデータが届く。
curl https://api.anthropic.com/v1/messages \
-H "x-api-key: $ANTHROPIC_API_KEY" \
-H "anthropic-version: 2023-06-01" \
-H "content-type: application/json" \
-d '{
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"stream": true,
"messages": [{"role": "user", "content": "Hello"}]
}'
SSEイベントの種類
| イベント | 説明 |
|---|---|
message_start |
メッセージ開始(モデル情報等) |
content_block_start |
コンテンツブロック開始 |
content_block_delta |
テキストの差分(部分テキスト) |
content_block_stop |
コンテンツブロック終了 |
message_delta |
stop_reason、usage情報 |
message_stop |
メッセージ完了 |
Message Batches API
大量のリクエストを非同期でまとめて処理する。個別に送るより50%割引の料金になる。
バッチの作成
import anthropic
client = anthropic.Anthropic()
# リクエストを配列で作成
requests = []
for i, text in enumerate(texts_to_summarize):
requests.append({
"custom_id": f"summary-{i}",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 512,
"messages": [
{"role": "user", "content": f"以下を1文で要約:\n{text}"}
]
}
})
# バッチ送信
batch = client.messages.batches.create(requests=requests)
print(f"バッチID: {batch.id}")
print(f"ステータス: {batch.processing_status}")
結果の取得
# ステータス確認
batch = client.messages.batches.retrieve(batch.id)
print(f"ステータス: {batch.processing_status}")
# => "ended" になったら結果取得可能
# 結果を取得
for result in client.messages.batches.results(batch.id):
print(f"ID: {result.custom_id}")
if result.result.type == "succeeded":
print(f"結果: {result.result.message.content[0].text}")
else:
print(f"エラー: {result.result.error}")
バッチの特徴
| 項目 | 詳細 |
|---|---|
| 料金 | 通常の50%割引 |
| 処理時間 | 最大24時間以内 |
| 最大リクエスト数 | 100,000件/バッチ |
| 用途 | 大量翻訳、分類、データ処理 |
使い分けの指針
| ユースケース | 方式 | 理由 |
|---|---|---|
| チャットUI | ストリーミング | ユーザー体験(文字が流れる) |
| APIサーバー | 通常(非ストリーミング) | シンプルな実装 |
| 1000件のレビュー分析 | バッチ | コスト50%削減 |
| リアルタイム翻訳 | ストリーミング | 低遅延 |
| 日次データ処理 | バッチ | 大量処理+コスト最適化 |
参考リンク
- Streaming Guide — ストリーミングの公式リファレンス
- Message Batches — バッチ処理ガイド
- API Reference — Messages APIリファレンス