Python 的线程池的回调函数是在主线程还是工作线程中运行的?

Python 的线程池的回调函数是在主线程还是工作线程中运行的?

Python 的线程池回调函数是在主线程还是工作线程中运行的?

from concurrent.futures import ThreadPoolExecutor

import threading

def action():

return ''

def callback_func(future):

print(threading.current_thread().name)

# assert 'ThreadPoolExecutor' in threading.current_thread().name

# assert 'MainThread' in threading.current_thread().name

pool = ThreadPoolExecutor(max_workers=10)

for i in range(10):

future = pool.submit(action)

future.add_done_callback(callback_func)

输出如下:

MainThread

ThreadPoolExecutor-0_1

ThreadPoolExecutor-0_1

MainThread

ThreadPoolExecutor-0_3

ThreadPoolExecutor-0_3

ThreadPoolExecutor-0_3

MainThread

ThreadPoolExecutor-0_1

ThreadPoolExecutor-0_1

print 的结果来看,有一些是在 MainThread 中,有一些是在 ThreadPoolExecutor 执行 callback_func 函数

这这这??????

如果加点时延,又全在 ThreadPoolExecutor 中执行回调函数了:

from concurrent.futures import ThreadPoolExecutor

import threading

import time

def action():

time.sleep(1)

return ''

def callback_func(future):

print(threading.current_thread().name)

# assert 'ThreadPoolExecutor' in threading.current_thread().name

# assert 'MainThread' in threading.current_thread().name

pool = ThreadPoolExecutor(max_workers=10)

for i in range(10):

future = pool.submit(action)

future.add_done_callback(callback_func)

输出如下:

ThreadPoolExecutor-0_5

ThreadPoolExecutor-0_0

ThreadPoolExecutor-0_7ThreadPoolExecutor-0_3

ThreadPoolExecutor-0_6

ThreadPoolExecutor-0_1

ThreadPoolExecutor-0_9

ThreadPoolExecutor-0_8

ThreadPoolExecutor-0_2

ThreadPoolExecutor-0_4


回答:

python 中是 master thread 执行 callback 还是 work thread 负责执行 callback?

我们先来看看在 python 的线程池中是如何添加回调函数的吧!

from concurrent.futures import ThreadPoolExecutor

from loguru import logger

import requests

import time

pool = ThreadPoolExecutor(max_workers=10)

def func_get():

pass

def func_callback():

pass

for i in range(100):

feature = pool.submit(func_get)

feature.add_done_callback(func_callback)

可以看到:

  • 先 submit
  • 再 add_done_callback

这就有一个问题,如果 func_get 执行很快很快,还没来的及 add_done_callback 就执行完成,那这个时候 add_done_callback(func_callback) ,这个 func_callback 还会被执行吗?

答案是会得!

为什么呢?

一起来看看线程池的源码吧!

python 的线程池完全是纯python实现的,没有使用 c,可以放心阅读

使用 submit 之后,会返回一个 Future 实例
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/thread.py

def submit(self, fn, /, *args, **kwargs):

with self._shutdown_lock, _global_shutdown_lock:

if self._broken:

raise BrokenThreadPool(self._broken)

if self._shutdown:

raise RuntimeError('cannot schedule new futures after shutdown')

if _shutdown:

raise RuntimeError('cannot schedule new futures after '

'interpreter shutdown')

f = _base.Future()

w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)

self._adjust_thread_count()

return f

随后调用 Future 实例的 add_done_callback 方法
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py

def add_done_callback(self, fn):

"""Attaches a callable that will be called when the future finishes.

Args:

fn: A callable that will be called with this future as its only

argument when the future completes or is cancelled. The callable

will always be called by a thread in the same process in which

it was added. If the future has already completed or been

cancelled then the callable will be called immediately. These

callables are called in the order that they were added.

"""

with self._condition:

if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:

self._done_callbacks.append(fn)

return

try:

fn(self)

except Exception:

LOGGER.exception('exception calling callback for %r', self)

可以看到,很简单:

  • 如果调用 add_done_callback 的时候,这个 future 的 _state 已经是 CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED 这些状态中的任何一个了, 就说明线程池已经把这个任务做完了,这个时候,主线程会去执行 callback 任务。
  • 如果调用 add_done_callback 的时候,future 还没有 done,就会把 call_back 添加到 call_back 列表中。

call_back 列表列表中的任务,会在 work thread 干完 feature 对应的正事之后执行,这个时候是在 workthread 中执行的。
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py

def _invoke_callbacks(self):

for callback in self._done_callbacks:

try:

callback(self)

except Exception:

LOGGER.exception('exception calling callback for %r', self)

_invoke_callbacks 会在 cancelset_resultset_exception 这三中情况末尾执行

/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py

def cancel(self):

"""Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future

cannot be cancelled if it is running or has already completed.

"""

with self._condition:

if self._state in [RUNNING, FINISHED]:

return False

if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

return True

self._state = CANCELLED

self._condition.notify_all()

self._invoke_callbacks()

return True

def set_result(self, result):

"""Sets the return value of work associated with the future.

Should only be used by Executor implementations and unit tests.

"""

with self._condition:

if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:

raise InvalidStateError('{}: {!r}'.format(self._state, self))

self._result = result

self._state = FINISHED

for waiter in self._waiters:

waiter.add_result(self)

self._condition.notify_all()

self._invoke_callbacks()

def set_exception(self, exception):

"""Sets the result of the future as being the given exception.

Should only be used by Executor implementations and unit tests.

"""

with self._condition:

if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:

raise InvalidStateError('{}: {!r}'.format(self._state, self))

self._exception = exception

self._state = FINISHED

for waiter in self._waiters:

waiter.add_exception(self)

self._condition.notify_all()

self._invoke_callbacks()

总结:

  • 调用 add_done_callback 时,task 已经完成,则在 master thread 中执行 callback
  • 否则由 work thread 自行执行 callback

建议:
所以要小心,不要在 callback 中执行可能会导致长时间阻塞的任务

以上是 Python 的线程池的回调函数是在主线程还是工作线程中运行的? 的全部内容, 来源链接: utcz.com/p/938292.html

回到顶部