Skip to content

FastAPI 异步请求处理

在 FastAPI 应用中处理并发请求时,你可能会遇到请求被串行处理而非并行的情况。本文将详细解析这个问题,并提供多种解决方案。

问题描述

以下是一个简单的 FastAPI 应用示例:

python
from fastapi import FastAPI, Request
import time

app = FastAPI()

@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    time.sleep(5)
    print("bye")
    return {"ping": "pong!"}

当你从浏览器的不同标签页同时访问此端点时,控制台输出可能是:

bash
Hello
bye
Hello
bye

而非期望的:

bash
Hello
Hello
bye
bye

这表明请求被串行处理,而非并行。

根本原因

异步函数与阻塞操作

FastAPI 的异步端点(使用 async def 定义)运行在事件循环中。如果其中包含阻塞操作(如 time.sleep()),会阻塞整个事件循环,导致其他请求无法被同时处理。

重要提醒

async def 端点中使用阻塞调用会阻止事件循环处理其他请求,导致串行执行。

浏览器行为影响

浏览器对同一主机的并行连接数有限制,可能会顺序发送请求而非并行发送。使用 print(request.client) 可以验证请求是否来自同一客户端端口。

解决方案

方案1:使用同步端点

将端点改为同步定义(使用 def),让 FastAPI 在外部线程池中运行:

python
@app.get("/ping")
def ping(request: Request):
    print("Hello")
    time.sleep(5)
    print("bye")
    return {"ping": "pong!"}

方案2:使用真正的异步睡眠

time.sleep() 替换为异步的 asyncio.sleep()

python
import asyncio

@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    await asyncio.sleep(5)  # 非阻塞
    print("bye")
    return {"ping": "pong!"}

方案3:在线程中运行阻塞操作

对于必须在异步端点中运行的阻塞操作,使用线程池:

python
from fastapi.concurrency import run_in_threadpool
import time

def blocking_task():
    time.sleep(5)
    return "Done"

@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    result = await run_in_threadpool(blocking_task)
    print("bye")
    return {"result": result}

或者使用 asyncio 的线程池:

python
import asyncio
import time

def blocking_task():
    time.sleep(5)
    return "Done"

@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_task)
    print("bye")
    return {"result": result}

方案4:处理CPU密集型任务

对于CPU密集型任务,使用进程池:

python
import asyncio
import concurrent.futures

def cpu_intensive_task():
    # 模拟CPU密集型工作
    result = 0
    for i in range(10**7):
        result += i
    return result

@app.get("/compute")
async def compute():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_intensive_task)
    return {"result": result}

方案5:增加工作进程

通过增加UVicorn工作进程数来利用多核CPU:

bash
uvicorn main:app --workers 4

注意事项

  • 每个工作进程有独立的内存空间,全局变量不共享
  • 考虑使用数据库或缓存来共享状态
  • 确保有足够的内存容纳多个进程

测试并发请求

使用 httpx 库测试并发请求:

python
import httpx
import asyncio

async def test_concurrent_requests():
    async with httpx.AsyncClient() as client:
        tasks = [
            client.get("http://localhost:8000/ping") 
            for _ in range(3)
        ]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            print(response.json())

asyncio.run(test_concurrent_requests())

最佳实践

  1. I/O密集型操作:使用 async def 和真正的异步库
  2. CPU密集型操作:使用同步端点或进程池
  3. 混合场景:在异步端点中使用 run_in_threadpoolrun_in_executor
  4. 资源管理:根据需要调整线程池和进程池大小
  5. 性能测试:对不同方案进行基准测试,选择最适合的方案

通过合理选择端点类型和并发策略,你可以充分发挥 FastAPI 的高性能特性,实现真正的并行请求处理。