uvicorn 如何调节线程池大小
gunicorn
有 threads 启动参数用来调节线程池的大小
但是 uvicorn
貌似没有类似的参数?
实测 uvicorn
的线程池的大小为 40
本来以为 limit_concurrency
参数来限制线程池的大小
实际测试也确实如此,但是发生了一个重大问题!
不加 limit_concurrency
的 pool 的 max 是 40,当一下子涌进来 400000 个请求,多出来的 399960 个请求会排排队,等待处理。
但是使用了 limit_concurrency
之后,多出来的 399960 个请求直接 503 了!
虽然fastapi
+uvicorn
是为了异步而生的,但是我只想同步用而已!
回答:
我知道了,这个原理是这样的
参考: fastapi 的 middleware 怎么改成同步模式?
fastapi 会把没有加 async 的视图函数放到线程池中去跑
而不是 「uvicorn 会把没有加 async 的视图函数放到线程池中去跑」
所以如果你需要调整 「调节线程池大小」,配置 uvicorn 是没用的
fastapi 的 fastapi/routing.py
文件,有一个 run_endpoint_function 函数
async def run_endpoint_function( *, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool
) -> Any:
# Only called by get_request_handler. Has been split into its own function to
# facilitate profiling endpoints, since inner functions are harder to profile.
assert dependant.call is not None, "dependant.call must be a function"
if is_coroutine:
return await dependant.call(**values)
else:
return await run_in_threadpool(dependant.call, **values)
但是视图函数没有加 async,那么 is_coroutine 就是 False,所以会执行 run_in_threadpool
,我们的视图函数就被放到线程池里面去跑了
那么回答一开始的问题,这个时候我们需要改一下问题的标题了:「fastapi 如何调节线程池大小」
那么如何调整的?run_in_threadpool 不是 fastapi 实现的,而是 starlette 实现的
fastapi 只是引用了 starlette 而已:from starlette.concurrency import run_in_threadpool
那么 starlette 是怎么实现的?
我也不知道,我问了一下 ChatGPT,下面是我和 chatGPT 的 QA 日志:
以下的内容,仅供参考:
Q:讲解一下 starlette 的 run_in_threadpool
A:在 Starlette 中,run_in_threadpool
是一个函数,用于在线程池中运行阻塞的同步函数,以避免阻塞异步事件循环。该函数可以将同步函数转换为异步函数,并在后台的线程池中执行,以确保不会阻塞主事件循环。
run_in_threadpool
函数位于 starlette.background
模块中。以下是该函数的签名:
def run_in_threadpool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Awaitable[Any]: ...
它接受一个同步函数 func
和其参数,然后返回一个可被 await
的对象,表示异步操作的结果。在后台,run_in_threadpool
将同步函数放入线程池中执行,并返回一个 Future
对象,以便在主事件循环中等待其完成。
以下是一个示例,演示如何使用 run_in_threadpool
来运行阻塞的同步函数:
from starlette.background import run_in_threadpooldef blocking_sync_function(arg1: int, arg2: str) -> str:
# 阻塞的同步函数
...
async def async_function():
result = await run_in_threadpool(blocking_sync_function, 123, "hello")
# 使用结果进行异步操作
...
在上述示例中,我们定义了一个名为 blocking_sync_function
的阻塞的同步函数。在 async_function
中,我们使用 run_in_threadpool
来调用 blocking_sync_function
,并传递参数 123
和 "hello"
。run_in_threadpool
将 blocking_sync_function
放入线程池中执行,并返回一个可被 await
的对象。
通过使用 run_in_threadpool
,我们可以在异步函数中调用阻塞的同步函数,而不会阻塞主事件循环。这对于需要处理阻塞操作的场景非常有用,如调用同步的第三方库、执行计算密集型的任务等。请注意,由于线程池中的执行是同步的,所以在执行阻塞操作时,主事件循环仍然可以处理其他异步任务。
Q:如何设置 run_in_threadpool 对应的线程池的大小
A:在 Starlette 的 run_in_threadpool
中,您可以通过配置 asyncio
的线程池来设置线程池的大小。run_in_threadpool
实际上是使用了 asyncio.get_event_loop().run_in_executor()
来在线程池中执行函数。
您可以使用 asyncio
模块中的 get_event_loop
方法来获取当前的事件循环,并调用其 set_default_executor
方法来设置线程池的大小。以下是一个示例:
import asynciofrom concurrent.futures import ThreadPoolExecutor
from starlette.background import run_in_threadpool
def my_blocking_function(arg):
# 阻塞的同步函数
...
# 设置线程池的大小
executor = ThreadPoolExecutor(max_workers=10)
asyncio.get_event_loop().set_default_executor(executor)
# 在异步函数中使用 run_in_threadpool
async def async_function():
result = await run_in_threadpool(my_blocking_function, arg)
...
在上述示例中,首先使用 ThreadPoolExecutor
创建了一个线程池,通过设置 max_workers
参数来指定线程池的大小为 10。然后,使用 asyncio.get_event_loop().set_default_executor()
方法将线程池设置为当前事件循环的默认执行器。
接下来,在异步函数 async_function
中,您可以使用 run_in_threadpool
来执行阻塞的同步函数 my_blocking_function
,并在其中传递参数。run_in_threadpool
将使用您设置的线程池来执行函数,并返回一个可被 await
的对象。
请注意,设置线程池的大小是为了控制并发执行的线程数。根据您的应用程序需求,您可以根据实际情况来设置线程池的大小,以达到最佳性能。
Q:fastapi 中,如何设置 run_in_threadpool 对应的线程池的大小
A:在 FastAPI 中使用 run_in_threadpool
函数时,默认情况下,它使用的是 Python 的全局线程池。要设置线程池的大小,您可以使用 concurrent.futures
模块的 ThreadPoolExecutor
类来创建一个自定义的线程池,并将其传递给 run_in_threadpool
。
以下是一个示例,演示如何在 FastAPI 中设置 run_in_threadpool
对应的线程池的大小:
from concurrent.futures import ThreadPoolExecutorfrom fastapi import FastAPI
from starlette.background import run_in_threadpool
app = FastAPI()
# 创建自定义线程池
executor = ThreadPoolExecutor(max_workers=10)
@app.get("/")
async def async_function():
result = await run_in_threadpool(my_blocking_function, arg)
# 处理结果
...
def my_blocking_function(arg):
# 阻塞的同步函数
...
# 设置 app 的线程池
app.state.executor = executor
在上述示例中,我们首先创建了一个自定义的线程池 executor
,通过设置 max_workers
参数来指定线程池的大小为 10。
然后,在异步路由处理函数 async_function
中,我们使用 run_in_threadpool
来执行阻塞的同步函数 my_blocking_function
,并在其中传递参数。run_in_threadpool
将使用自定义的线程池来执行函数。
最后,我们将自定义的线程池设置为 FastAPI 应用程序的状态 app.state.executor
,以便在整个应用程序中共享该线程池。
请注意,使用自定义线程池可以让您控制并发执行的线程数,以满足应用程序的性能需求。根据实际情况和系统资源,您可以根据需要设置线程池的大小。
以上是 uvicorn 如何调节线程池大小 的全部内容, 来源链接: utcz.com/p/938412.html