Python 的线程池的回调函数是在主线程还是工作线程中运行的?
Python 的线程池
的回调函数
是在主线程
还是工作线程
中运行的?
from concurrent.futures import ThreadPoolExecutorimport threading
def action():
return ''
def callback_func(future):
print(threading.current_thread().name)
# assert 'ThreadPoolExecutor' in threading.current_thread().name
# assert 'MainThread' in threading.current_thread().name
pool = ThreadPoolExecutor(max_workers=10)
for i in range(10):
future = pool.submit(action)
future.add_done_callback(callback_func)
输出如下:
MainThreadThreadPoolExecutor-0_1
ThreadPoolExecutor-0_1
MainThread
ThreadPoolExecutor-0_3
ThreadPoolExecutor-0_3
ThreadPoolExecutor-0_3
MainThread
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_1
从 print
的结果来看,有一些是在 MainThread
中,有一些是在 ThreadPoolExecutor
执行 callback_func
函数
这这这??????
如果加点时延,又全在 ThreadPoolExecutor 中执行回调函数了:
from concurrent.futures import ThreadPoolExecutorimport threading
import time
def action():
time.sleep(1)
return ''
def callback_func(future):
print(threading.current_thread().name)
# assert 'ThreadPoolExecutor' in threading.current_thread().name
# assert 'MainThread' in threading.current_thread().name
pool = ThreadPoolExecutor(max_workers=10)
for i in range(10):
future = pool.submit(action)
future.add_done_callback(callback_func)
输出如下:
ThreadPoolExecutor-0_5ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_7ThreadPoolExecutor-0_3
ThreadPoolExecutor-0_6
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_9
ThreadPoolExecutor-0_8
ThreadPoolExecutor-0_2
ThreadPoolExecutor-0_4
回答:
python 中是 master thread 执行 callback 还是 work thread 负责执行 callback?
我们先来看看在 python 的线程池中是如何添加回调函数的吧!
from concurrent.futures import ThreadPoolExecutorfrom loguru import logger
import requests
import time
pool = ThreadPoolExecutor(max_workers=10)
def func_get():
pass
def func_callback():
pass
for i in range(100):
feature = pool.submit(func_get)
feature.add_done_callback(func_callback)
可以看到:
- 先 submit
- 再 add_done_callback
这就有一个问题,如果 func_get 执行很快很快,还没来的及 add_done_callback 就执行完成,那这个时候 add_done_callback(func_callback) ,这个 func_callback 还会被执行吗?
答案是会得!
为什么呢?
一起来看看线程池的源码吧!
python 的线程池完全是纯python实现的,没有使用 c,可以放心阅读
使用 submit 之后,会返回一个 Future 实例/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/thread.py
def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock, _global_shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
if _shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
随后调用 Future 实例的 add_done_callback 方法/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py
def add_done_callback(self, fn): """Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
try:
fn(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
可以看到,很简单:
- 如果调用 add_done_callback 的时候,这个 future 的
_state
已经是 CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED 这些状态中的任何一个了, 就说明线程池已经把这个任务做完了,这个时候,主线程会去执行 callback 任务。 - 如果调用 add_done_callback 的时候,future 还没有 done,就会把 call_back 添加到 call_back 列表中。
call_back 列表列表中的任务,会在 work thread 干完 feature 对应的正事之后执行,这个时候是在 workthread 中执行的。/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py
def _invoke_callbacks(self): for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
_invoke_callbacks
会在 cancel
、 set_result
、 set_exception
这三中情况末尾执行
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py
def cancel(self): """Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
总结:
- 调用 add_done_callback 时,task 已经完成,则在 master thread 中执行 callback
- 否则由 work thread 自行执行 callback
建议:
所以要小心,不要在 callback 中执行可能会导致长时间阻塞的任务
以上是 Python 的线程池的回调函数是在主线程还是工作线程中运行的? 的全部内容, 来源链接: utcz.com/p/938292.html