FastAPI 异步请求处理
在 FastAPI 应用中处理并发请求时,你可能会遇到请求被串行处理而非并行的情况。本文将详细解析这个问题,并提供多种解决方案。
问题描述
以下是一个简单的 FastAPI 应用示例:
python
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
当你从浏览器的不同标签页同时访问此端点时,控制台输出可能是:
bash
Hello
bye
Hello
bye
而非期望的:
bash
Hello
Hello
bye
bye
这表明请求被串行处理,而非并行。
根本原因
异步函数与阻塞操作
FastAPI 的异步端点(使用 async def
定义)运行在事件循环中。如果其中包含阻塞操作(如 time.sleep()
),会阻塞整个事件循环,导致其他请求无法被同时处理。
重要提醒
在 async def
端点中使用阻塞调用会阻止事件循环处理其他请求,导致串行执行。
浏览器行为影响
浏览器对同一主机的并行连接数有限制,可能会顺序发送请求而非并行发送。使用 print(request.client)
可以验证请求是否来自同一客户端端口。
解决方案
方案1:使用同步端点
将端点改为同步定义(使用 def
),让 FastAPI 在外部线程池中运行:
python
@app.get("/ping")
def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
方案2:使用真正的异步睡眠
将 time.sleep()
替换为异步的 asyncio.sleep()
:
python
import asyncio
@app.get("/ping")
async def ping(request: Request):
print("Hello")
await asyncio.sleep(5) # 非阻塞
print("bye")
return {"ping": "pong!"}
方案3:在线程中运行阻塞操作
对于必须在异步端点中运行的阻塞操作,使用线程池:
python
from fastapi.concurrency import run_in_threadpool
import time
def blocking_task():
time.sleep(5)
return "Done"
@app.get("/ping")
async def ping(request: Request):
print("Hello")
result = await run_in_threadpool(blocking_task)
print("bye")
return {"result": result}
或者使用 asyncio
的线程池:
python
import asyncio
import time
def blocking_task():
time.sleep(5)
return "Done"
@app.get("/ping")
async def ping(request: Request):
print("Hello")
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_task)
print("bye")
return {"result": result}
方案4:处理CPU密集型任务
对于CPU密集型任务,使用进程池:
python
import asyncio
import concurrent.futures
def cpu_intensive_task():
# 模拟CPU密集型工作
result = 0
for i in range(10**7):
result += i
return result
@app.get("/compute")
async def compute():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive_task)
return {"result": result}
方案5:增加工作进程
通过增加UVicorn工作进程数来利用多核CPU:
bash
uvicorn main:app --workers 4
注意事项
- 每个工作进程有独立的内存空间,全局变量不共享
- 考虑使用数据库或缓存来共享状态
- 确保有足够的内存容纳多个进程
测试并发请求
使用 httpx
库测试并发请求:
python
import httpx
import asyncio
async def test_concurrent_requests():
async with httpx.AsyncClient() as client:
tasks = [
client.get("http://localhost:8000/ping")
for _ in range(3)
]
responses = await asyncio.gather(*tasks)
for response in responses:
print(response.json())
asyncio.run(test_concurrent_requests())
最佳实践
- I/O密集型操作:使用
async def
和真正的异步库 - CPU密集型操作:使用同步端点或进程池
- 混合场景:在异步端点中使用
run_in_threadpool
或run_in_executor
- 资源管理:根据需要调整线程池和进程池大小
- 性能测试:对不同方案进行基准测试,选择最适合的方案
通过合理选择端点类型和并发策略,你可以充分发挥 FastAPI 的高性能特性,实现真正的并行请求处理。