Skip to content

FastAPI StreamingResponseのストリーミング不具合

問題概要

FastAPIのStreamingResponseを使用してOpenAI APIからのレスポンスをストリーミングしようとした際に、以下の問題が発生します:

  • OpenAI APIからはデータがリアルタイムでストリーミングされている(サーバー側コンソールには逐次出力される)
  • しかしクライアント側でレスポンスが一括で受信され、ストリーミングが機能しない
  • テストスクリプトでiter_lines()を使用してもリアルタイム表示されない
python
def ask_statesman(query: str):
    openai_stream = openai.ChatCompletion.create(stream=True)
    for line in openai_stream:
        current_response = line["choices"][0].delta.content
        print(current_response)  # コンソールには逐次出力される
        yield current_response
        time.sleep(0.25)

@app.post("/")
async def request_handler(auth_key: str, query: str):
    stream_response = ask_statesman(query)
    return StreamingResponse(stream_response, media_type="text/plain")
python
response = requests.post(url, params=params, stream=True)
for chunk in response.iter_lines():
    if chunk:
        print(chunk.decode("utf-8"))  # ストリーミングされない

根本原因

ストリーミングが機能しない主な要因は以下の組み合わせです:

  1. メディアタイプ問題: text/plainでのブラウザバッファリング
  2. ブロッキング操作: time.sleep()によるイベントループのブロック
  3. クライアント処理: iter_lines()の行バッファリング特性
  4. HTTPメソッド不備: データ取得に不適切なPOSTメソッドの使用
  5. セキュリティ実装: クエリパラメータでの認証情報送信

解決策

1. メディアタイプとレスポンス設定の最適化

ブラウザのMIME snifing(~1445バイトのバッファリング)を回避するには以下のいずれかを使用します。

python
return StreamingResponse(stream_response, media_type="text/event-stream")
python
headers = {'X-Content-Type-Options': 'nosniff'}
return StreamingResponse(stream_response, headers=headers, media_type="text/plain")

2. 非同期対応とブロッキング操作の回避

非同期ジェネレータを使用する場合:

python
import asyncio

async def ask_statesman(query: str):
    openai_stream = await openai.ChatCompletion.acreate(stream=True, ...)
    async for line in openai_stream:
        content = line.choices[0].delta.get("content", "")
        if content:
            yield content.encode("utf-8")
            await asyncio.sleep(0.1)  # 非同期スリープ

同期ジェネレータを維持する場合(推奨):

python
def ask_statesman(query: str):
    for line in openai.ChatCompletion.create(stream=True, ...):
        content = line.choices[0].delta.get("content", "")
        if content:
            yield content
            time.sleep(0.1)  # FastAPIが別スレッドで処理

3. クライアント側処理の改善

クライアントではiter_lines()ではなくiter_content()を使用:

python
response = requests.get(url, stream=True)
for chunk in response.iter_content(chunk_size=1):  # 1バイトずつ処理
    if chunk:
        print(chunk.decode("utf-8"), end='', flush=True)

HTTPXライブラリの使用(非同期対応・パフォーマンス向上):

python
import httpx

with httpx.stream('GET', url) as response:
    for chunk in response.iter_bytes():
        print(chunk.decode("utf-8"), end='', flush=True)

4. API設計とセキュリティのベストプラクティス

  • 認証情報をヘッダーで送信
  • GETメソッドに変更(データ取得に適切)
python
from fastapi import Header

@app.get("/")
async def request_handler(authorization: str = Header(None)):
    if authorization != "Bearer valid_token":
        raise HTTPException(status_code=401)
    # 処理続行

5. サーバーサイドイベント(SSE)の使用

streaming_responseの代わりにSSEを使用する方法:

python
pip install sse-starlette
python
from sse_starlette.sse import EventSourceResponse

@app.get('/stream')
async def stream_data():
    def generate():
        for i in range(10):
            yield f"data: メッセージ {i}\n\n"
            time.sleep(0.5)
    return EventSourceResponse(generate())

完全な動作コード例

python
import os
import asyncio
import openai
from fastapi import FastAPI, Header, HTTPException
from fastapi.responses import StreamingResponse

app = FastAPI()
openai.api_key = os.getenv("OPENAI_API_KEY")

async def openai_streamer(query: str):
    stream = await openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": query}],
        temperature=0.0,
        stream=True
    )
    async for chunk in stream:
        content = chunk.choices[0].delta.get("content", "")
        if content:
            yield content.encode("utf-8")
            await asyncio.sleep(0.1)

@app.get("/chat")
async def chat_endpoint(
    query: str,
    authorization: str = Header(..., alias="Authorization")
):
    if authorization != "Bearer valid_token":
        raise HTTPException(status_code=401, detail="認証エラー")
    
    headers = {"X-Content-Type-Options": "nosniff"}
    return StreamingResponse(
        openai_streamer(query),
        media_type="text/event-stream",
        headers=headers
    )
python
import httpx

url = "http://localhost:8000/chat"
params = {"query": "エッフェル塔の高さは?"}
headers = {"Authorization": "Bearer valid_token"}

with httpx.stream('GET', url, params=params, headers=headers) as r:
    for chunk in r.iter_bytes():
        print(chunk.decode("utf-8"), end='', flush=True)

テストとデバッグのポイント

  1. cURLでのテスト:

    bash
    curl -N http://localhost:8000/chat?query=エッフェル塔の高さ -H "Authorization: Bearer valid_token"

    -Nフラグでバッファリング無効化

  2. ブラウザでの直接確認:

    html
    <script>
    const eventSource = new EventSource('/chat?query=エッフェル塔の高さ');
    eventSource.onmessage = (event) => {
      console.log(event.data);
    };
    </script>
  3. ネットワーク条件のシミュレーション: DevToolsで低速ネットワークを設定し動作確認

パフォーマンス最適化

  1. チャンクサイズ最適化: デフォルトの8192バイトから小さい値を試す
  2. 非同期実行の徹底:
    python
    from anyio import run_in_thread
    
    async def processor():
        result = await run_in_thread(cpu_intensive_task)
        yield result
  3. Gzip圧縮無効化:
    python
    return StreamingResponse(..., headers={"Content-Encoding": "identity"})

まとめ

問題点解決策
ブラウザバッファリングtext/event-stream使用 or X-Content-Type-Options: nosniff
イベントループブロック非同期処理実装 or 標準ジェネレータ維持
クライアント側出力iter_content()/iter_bytes()使用
メソッド不適切GETメソッドへの変更
セキュリティ問題ヘッダーでの認証情報送信

これらの対策を実装することで、FastAPIのStreamingResponseはOpenAI APIとの連携を含め、安定したストリーミング機能を提供します。特に大規模言語モデルとの連携や長時間処理が必要なAPIでは、正しいストリーミング実装がユーザー体験向上に不可欠です。