python threading ThreadPoolExecutor源码解析

python

future: 未来对象,或task的返回容器

1. 当submit后:

    def submit(self, fn, *args, **kwargs):

with self._shutdown_lock: # lock是线程锁

if self._shutdown:

raise RuntimeError('cannot schedule new futures after shutdown')

f = _base.Future() # 创建future对象

w = _WorkItem(f, fn, args, kwargs) # 线程池执行基本单位

self._work_queue.put(w) #实现的是queue

self._adjust_thread_count() # 这里会进行判断当前执行线程的数量

return f

2. _adjust_thread_count:

    def _adjust_thread_count(self):

# When the executor gets lost, the weakref callback will wake up

# the worker threads.

def weakref_cb(_, q=self._work_queue):

q.put(None)

# TODO(bquinlan): Should avoid creating new threads if there are more

# idle threads than items in the work queue.

num_threads = len(self._threads)

if num_threads < self._max_workers:

thread_name = '%s_%d' % (self._thread_name_prefix or self,

num_threads)

t = threading.Thread(name=thread_name, target=_worker,

args=(weakref.ref(self, weakref_cb),

self._work_queue)) # 创建线程,并调用_worker方法,传入work queue

t.daemon = True

t.start()

self._threads.add(t)

_threads_queues[t] = self._work_queue

3. _worker:

def _worker(executor_reference, work_queue):

try:

while True:

work_item = work_queue.get(block=True)

if work_item is not None:

work_item.run()

# Delete references to object. See issue16284

del work_item

continue

executor = executor_reference()

# Exit if:

# - The interpreter is shutting down OR

# - The executor that owns the worker has been collected OR

# - The executor that owns the worker has been shutdown.

if _shutdown or executor is None or executor._shutdown:

# Notice other workers

work_queue.put(None)

return

del executor

except BaseException:

_base.LOGGER.critical('Exception in worker', exc_info=True)

4. WorkItem

class _WorkItem(object):

def __init__(self, future, fn, args, kwargs):

self.future = future

self.fn = fn

self.args = args

self.kwargs = kwargs

def run(self):

if not self.future.set_running_or_notify_cancel():

return

try:

result = self.fn(*self.args, **self.kwargs)

except BaseException as exc:

self.future.set_exception(exc)

# Break a reference cycle with the exception 'exc'

self = None

else:

self.future.set_result(result)

以上是 python threading ThreadPoolExecutor源码解析 的全部内容, 来源链接: utcz.com/z/389047.html

回到顶部