FastAPI StreamingResponseのストリーミング不具合
問題概要
FastAPIのStreamingResponse
を使用してOpenAI APIからのレスポンスをストリーミングしようとした際に、以下の問題が発生します:
- OpenAI APIからはデータがリアルタイムでストリーミングされている(サーバー側コンソールには逐次出力される)
- しかしクライアント側でレスポンスが一括で受信され、ストリーミングが機能しない
- テストスクリプトで
iter_lines()
を使用してもリアルタイム表示されない
python
def ask_statesman(query: str):
openai_stream = openai.ChatCompletion.create(stream=True)
for line in openai_stream:
current_response = line["choices"][0].delta.content
print(current_response) # コンソールには逐次出力される
yield current_response
time.sleep(0.25)
@app.post("/")
async def request_handler(auth_key: str, query: str):
stream_response = ask_statesman(query)
return StreamingResponse(stream_response, media_type="text/plain")
python
response = requests.post(url, params=params, stream=True)
for chunk in response.iter_lines():
if chunk:
print(chunk.decode("utf-8")) # ストリーミングされない
根本原因
ストリーミングが機能しない主な要因は以下の組み合わせです:
- メディアタイプ問題:
text/plain
でのブラウザバッファリング - ブロッキング操作:
time.sleep()
によるイベントループのブロック - クライアント処理:
iter_lines()
の行バッファリング特性 - HTTPメソッド不備: データ取得に不適切なPOSTメソッドの使用
- セキュリティ実装: クエリパラメータでの認証情報送信
解決策
1. メディアタイプとレスポンス設定の最適化
ブラウザのMIME snifing(~1445バイトのバッファリング)を回避するには以下のいずれかを使用します。
python
return StreamingResponse(stream_response, media_type="text/event-stream")
python
headers = {'X-Content-Type-Options': 'nosniff'}
return StreamingResponse(stream_response, headers=headers, media_type="text/plain")
2. 非同期対応とブロッキング操作の回避
非同期ジェネレータを使用する場合:
python
import asyncio
async def ask_statesman(query: str):
openai_stream = await openai.ChatCompletion.acreate(stream=True, ...)
async for line in openai_stream:
content = line.choices[0].delta.get("content", "")
if content:
yield content.encode("utf-8")
await asyncio.sleep(0.1) # 非同期スリープ
同期ジェネレータを維持する場合(推奨):
python
def ask_statesman(query: str):
for line in openai.ChatCompletion.create(stream=True, ...):
content = line.choices[0].delta.get("content", "")
if content:
yield content
time.sleep(0.1) # FastAPIが別スレッドで処理
3. クライアント側処理の改善
クライアントではiter_lines()
ではなくiter_content()
を使用:
python
response = requests.get(url, stream=True)
for chunk in response.iter_content(chunk_size=1): # 1バイトずつ処理
if chunk:
print(chunk.decode("utf-8"), end='', flush=True)
HTTPXライブラリの使用(非同期対応・パフォーマンス向上):
python
import httpx
with httpx.stream('GET', url) as response:
for chunk in response.iter_bytes():
print(chunk.decode("utf-8"), end='', flush=True)
4. API設計とセキュリティのベストプラクティス
- 認証情報をヘッダーで送信
- GETメソッドに変更(データ取得に適切)
python
from fastapi import Header
@app.get("/")
async def request_handler(authorization: str = Header(None)):
if authorization != "Bearer valid_token":
raise HTTPException(status_code=401)
# 処理続行
5. サーバーサイドイベント(SSE)の使用
streaming_response
の代わりにSSEを使用する方法:
python
pip install sse-starlette
python
from sse_starlette.sse import EventSourceResponse
@app.get('/stream')
async def stream_data():
def generate():
for i in range(10):
yield f"data: メッセージ {i}\n\n"
time.sleep(0.5)
return EventSourceResponse(generate())
完全な動作コード例
python
import os
import asyncio
import openai
from fastapi import FastAPI, Header, HTTPException
from fastapi.responses import StreamingResponse
app = FastAPI()
openai.api_key = os.getenv("OPENAI_API_KEY")
async def openai_streamer(query: str):
stream = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
temperature=0.0,
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.get("content", "")
if content:
yield content.encode("utf-8")
await asyncio.sleep(0.1)
@app.get("/chat")
async def chat_endpoint(
query: str,
authorization: str = Header(..., alias="Authorization")
):
if authorization != "Bearer valid_token":
raise HTTPException(status_code=401, detail="認証エラー")
headers = {"X-Content-Type-Options": "nosniff"}
return StreamingResponse(
openai_streamer(query),
media_type="text/event-stream",
headers=headers
)
python
import httpx
url = "http://localhost:8000/chat"
params = {"query": "エッフェル塔の高さは?"}
headers = {"Authorization": "Bearer valid_token"}
with httpx.stream('GET', url, params=params, headers=headers) as r:
for chunk in r.iter_bytes():
print(chunk.decode("utf-8"), end='', flush=True)
テストとデバッグのポイント
cURLでのテスト:
bashcurl -N http://localhost:8000/chat?query=エッフェル塔の高さ -H "Authorization: Bearer valid_token"
-N
フラグでバッファリング無効化ブラウザでの直接確認:
html<script> const eventSource = new EventSource('/chat?query=エッフェル塔の高さ'); eventSource.onmessage = (event) => { console.log(event.data); }; </script>
ネットワーク条件のシミュレーション: DevToolsで低速ネットワークを設定し動作確認
パフォーマンス最適化
- チャンクサイズ最適化: デフォルトの8192バイトから小さい値を試す
- 非同期実行の徹底:python
from anyio import run_in_thread async def processor(): result = await run_in_thread(cpu_intensive_task) yield result
- Gzip圧縮無効化:python
return StreamingResponse(..., headers={"Content-Encoding": "identity"})
まとめ
問題点 | 解決策 |
---|---|
ブラウザバッファリング | text/event-stream 使用 or X-Content-Type-Options: nosniff |
イベントループブロック | 非同期処理実装 or 標準ジェネレータ維持 |
クライアント側出力 | iter_content() /iter_bytes() 使用 |
メソッド不適切 | GETメソッドへの変更 |
セキュリティ問題 | ヘッダーでの認証情報送信 |
これらの対策を実装することで、FastAPIのStreamingResponse
はOpenAI APIとの連携を含め、安定したストリーミング機能を提供します。特に大規模言語モデルとの連携や長時間処理が必要なAPIでは、正しいストリーミング実装がユーザー体験向上に不可欠です。