uvicorn 如何调节线程池大小

uvicorn 如何调节线程池大小

gunicorn 有 threads 启动参数用来调节线程池的大小

但是 uvicorn 貌似没有类似的参数?

实测 uvicorn 的线程池的大小为 40

本来以为 limit_concurrency 参数来限制线程池的大小

实际测试也确实如此,但是发生了一个重大问题!

不加 limit_concurrency 的 pool 的 max 是 40,当一下子涌进来 400000 个请求,多出来的 399960 个请求会排排队,等待处理。

但是使用了 limit_concurrency 之后,多出来的 399960 个请求直接 503 了!

虽然 fastapi + uvicorn 是为了异步而生的,但是我只想同步用而已!


回答:

我知道了,这个原理是这样的

参考: fastapi 的 middleware 怎么改成同步模式?

fastapi 会把没有加 async 的视图函数放到线程池中去跑

而不是 「uvicorn 会把没有加 async 的视图函数放到线程池中去跑」

所以如果你需要调整 「调节线程池大小」,配置 uvicorn 是没用的


fastapi 的 fastapi/routing.py 文件,有一个 run_endpoint_function 函数

async def run_endpoint_function(

*, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool

) -> Any:

# Only called by get_request_handler. Has been split into its own function to

# facilitate profiling endpoints, since inner functions are harder to profile.

assert dependant.call is not None, "dependant.call must be a function"

if is_coroutine:

return await dependant.call(**values)

else:

return await run_in_threadpool(dependant.call, **values)

但是视图函数没有加 async,那么 is_coroutine 就是 False,所以会执行 run_in_threadpool,我们的视图函数就被放到线程池里面去跑了


那么回答一开始的问题,这个时候我们需要改一下问题的标题了:「fastapi 如何调节线程池大小」

那么如何调整的?run_in_threadpool 不是 fastapi 实现的,而是 starlette 实现的

fastapi 只是引用了 starlette 而已:from starlette.concurrency import run_in_threadpool

那么 starlette 是怎么实现的?

我也不知道,我问了一下 ChatGPT,下面是我和 chatGPT 的 QA 日志:

以下的内容,仅供参考:

Q:讲解一下 starlette 的 run_in_threadpool

A:在 Starlette 中,run_in_threadpool 是一个函数,用于在线程池中运行阻塞的同步函数,以避免阻塞异步事件循环。该函数可以将同步函数转换为异步函数,并在后台的线程池中执行,以确保不会阻塞主事件循环。

run_in_threadpool 函数位于 starlette.background 模块中。以下是该函数的签名:

def run_in_threadpool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Awaitable[Any]:

...

它接受一个同步函数 func 和其参数,然后返回一个可被 await 的对象,表示异步操作的结果。在后台,run_in_threadpool 将同步函数放入线程池中执行,并返回一个 Future 对象,以便在主事件循环中等待其完成。

以下是一个示例,演示如何使用 run_in_threadpool 来运行阻塞的同步函数:

from starlette.background import run_in_threadpool

def blocking_sync_function(arg1: int, arg2: str) -> str:

# 阻塞的同步函数

...

async def async_function():

result = await run_in_threadpool(blocking_sync_function, 123, "hello")

# 使用结果进行异步操作

...

在上述示例中,我们定义了一个名为 blocking_sync_function 的阻塞的同步函数。在 async_function 中,我们使用 run_in_threadpool 来调用 blocking_sync_function,并传递参数 123"hello"run_in_threadpoolblocking_sync_function 放入线程池中执行,并返回一个可被 await 的对象。

通过使用 run_in_threadpool,我们可以在异步函数中调用阻塞的同步函数,而不会阻塞主事件循环。这对于需要处理阻塞操作的场景非常有用,如调用同步的第三方库、执行计算密集型的任务等。请注意,由于线程池中的执行是同步的,所以在执行阻塞操作时,主事件循环仍然可以处理其他异步任务。

Q:如何设置 run_in_threadpool 对应的线程池的大小

A:在 Starlette 的 run_in_threadpool 中,您可以通过配置 asyncio 的线程池来设置线程池的大小。run_in_threadpool 实际上是使用了 asyncio.get_event_loop().run_in_executor() 来在线程池中执行函数。

您可以使用 asyncio 模块中的 get_event_loop 方法来获取当前的事件循环,并调用其 set_default_executor 方法来设置线程池的大小。以下是一个示例:

import asyncio

from concurrent.futures import ThreadPoolExecutor

from starlette.background import run_in_threadpool

def my_blocking_function(arg):

# 阻塞的同步函数

...

# 设置线程池的大小

executor = ThreadPoolExecutor(max_workers=10)

asyncio.get_event_loop().set_default_executor(executor)

# 在异步函数中使用 run_in_threadpool

async def async_function():

result = await run_in_threadpool(my_blocking_function, arg)

...

在上述示例中,首先使用 ThreadPoolExecutor 创建了一个线程池,通过设置 max_workers 参数来指定线程池的大小为 10。然后,使用 asyncio.get_event_loop().set_default_executor() 方法将线程池设置为当前事件循环的默认执行器。

接下来,在异步函数 async_function 中,您可以使用 run_in_threadpool 来执行阻塞的同步函数 my_blocking_function,并在其中传递参数。run_in_threadpool 将使用您设置的线程池来执行函数,并返回一个可被 await 的对象。

请注意,设置线程池的大小是为了控制并发执行的线程数。根据您的应用程序需求,您可以根据实际情况来设置线程池的大小,以达到最佳性能。

Q:fastapi 中,如何设置 run_in_threadpool 对应的线程池的大小

A:在 FastAPI 中使用 run_in_threadpool 函数时,默认情况下,它使用的是 Python 的全局线程池。要设置线程池的大小,您可以使用 concurrent.futures 模块的 ThreadPoolExecutor 类来创建一个自定义的线程池,并将其传递给 run_in_threadpool

以下是一个示例,演示如何在 FastAPI 中设置 run_in_threadpool 对应的线程池的大小:

from concurrent.futures import ThreadPoolExecutor

from fastapi import FastAPI

from starlette.background import run_in_threadpool

app = FastAPI()

# 创建自定义线程池

executor = ThreadPoolExecutor(max_workers=10)

@app.get("/")

async def async_function():

result = await run_in_threadpool(my_blocking_function, arg)

# 处理结果

...

def my_blocking_function(arg):

# 阻塞的同步函数

...

# 设置 app 的线程池

app.state.executor = executor

在上述示例中,我们首先创建了一个自定义的线程池 executor,通过设置 max_workers 参数来指定线程池的大小为 10。

然后,在异步路由处理函数 async_function 中,我们使用 run_in_threadpool 来执行阻塞的同步函数 my_blocking_function,并在其中传递参数。run_in_threadpool 将使用自定义的线程池来执行函数。

最后,我们将自定义的线程池设置为 FastAPI 应用程序的状态 app.state.executor,以便在整个应用程序中共享该线程池。

请注意,使用自定义线程池可以让您控制并发执行的线程数,以满足应用程序的性能需求。根据实际情况和系统资源,您可以根据需要设置线程池的大小。

以上是 uvicorn 如何调节线程池大小 的全部内容, 来源链接: utcz.com/p/938412.html

回到顶部