FastAPIの並列処理と同期・非同期処理の適切な使い分け
問題の概要
FastAPIアプリケーションでエンドポイントを複数のタブから同時にアクセスした際、リクエストが並列ではなく直列で処理される現象が発生します。以下のコード例では、5秒間のスリープを含むエンドポイントがあります:
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
ブラウザの複数のタブからこのエンドポイントにアクセスすると、期待される出力:
Hello
Hello
bye
bye
実際の出力:
Hello
bye
Hello
bye
根本原因
この問題は、非同期エンドポイント内での同期ブロッキング操作が原因で発生します。FastAPIの非同期処理モデルとPythonのGIL(Global Interpreter Lock)の相互作用によるものです。
重要
async def
で定義されたエンドポイント内でtime.sleep()
などの同期ブロッキング操作を使用すると、イベントループが完全にブロックされ、他のリクエストの処理が妨げられます。
FastAPIの並行処理モデル
FastAPIは以下の2通りの方法でエンドポイントを処理します:
1. 非同期エンドポイント (async def
)
- イベントループで直接実行される
- 内部に
await
式がある場合、処理を一時停止し他のタスクに実行権を譲る await
がない場合はブロッキング操作となり、並行処理ができない
2. 同期エンドポイント (def
)
- 外部スレッドプールで実行され、完了を
await
する - ブロッキング操作を含む場合に適している
- デフォルトで40ワーカースレッドが利用可能
解決策
解決策1: 同期エンドポイントを使用する
ブロッキング操作を含む場合は、async def
ではなくdef
を使用します:
@app.get("/ping")
def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
解決策2: 非同期スリープを使用する
非同期対応の操作を使用する場合は、asyncio.sleep()
をawait
します:
import asyncio
@app.get("/ping")
async def ping(request: Request):
print("Hello")
await asyncio.sleep(5) # 非同期スリープ
print("bye")
return {"ping": "pong!"}
解決策3: スレッドプールでのブロッキング操作の実行
非同期エンドポイント内でブロッキング操作を実行する必要がある場合は、明示的にスレッドプールを使用します:
from fastapi.concurrency import run_in_threadpool
import asyncio
@app.get("/ping")
async def ping(request: Request):
print("Hello")
# スレッドプールでブロッキング操作を実行
await run_in_threadpool(time.sleep, 5)
print("bye")
return {"ping": "pong!"}
あるいは、asyncio
の機能を直接使用する方法もあります:
import asyncio
@app.get("/ping")
async def ping(request: Request):
print("Hello")
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, time.sleep, 5)
print("bye")
return {"ping": "pong!"}
ブラウザ側の注意点
クライアント側の制限
同じブラウザセッションからの複数リクエストは、ブラウザ自体によって直列化される場合があります。これを避けるには:
- シークレットウィンドウを使用する
- 異なるブラウザを使用する
httpx
などのクライアントライブラリを使用する
高度な使用例
CPUバウンドな処理の並列化
CPU集中型の処理には、プロセスプールの使用が効果的です:
import asyncio
import concurrent.futures
def cpu_intensive_task(data):
# CPU集中型の処理
return processed_data
@app.post("/process")
async def process_data(data: dict):
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_intensive_task, data
)
return {"result": result}
カスタムスレッドプールの設定
大量のブロッキング操作を処理する場合、スレッドプールのサイズを調整できます:
from fastapi.concurrency import run_in_threadpool
import concurrent.futures
import asyncio
# アプリケーション起動時にスレッドプールを作成
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=100)
@app.get("/resource-intensive")
async def resource_intensive():
def blocking_operation():
# 重い同期処理
time.sleep(2)
return "done"
result = await run_in_threadpool(blocking_operation)
return {"status": result}
ベストプラクティス
- I/Oバウンド操作: 非同期エンドポイントと非同期ライブラリを使用する
- CPUバウンド操作: 同期エンドポートまたはプロセスプールを使用する
- 混在する操作: ブロッキング部分をスレッドプールで実行する
- リソース管理: ワークロードに応じてスレッド/プロセスプールを適切にサイズ設定する
パフォーマンスチューニング
Uvicornのワーカー数を増やすことで、マルチコアCPUを活用できます:
uvicorn main:app --workers 4
まとめ
FastAPIで並列処理を実現するためには、操作の種類(I/Oバウンド vs CPUバウンド)に応じて適切なアプローチを選択することが重要です。非同期エンドポイントは非同期操作に最適ですが、同期操作を含む場合はスレッドプールでの実行や同期エンドポイントの使用を検討しましょう。