Python3线程池ThreadPoolExecutor总结

编程

是什么

ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

如何用

常用方法一:with ThreadPoolExecutor as t: t.submit

import concurrent.futures

import urllib.request

URLS = ["http://www.foxnews.com/",

"http://www.cnn.com/",

"http://europe.wsj.com/",

"http://www.bbc.co.uk/",

"http://some-made-up-domain.com/"]

# Retrieve a single page and report the URL and contents

def load_url(url, timeout):

with urllib.request.urlopen(url, timeout=timeout) as conn:

return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

# Start the load operations and mark each future with its URL

future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}

for future in concurrent.futures.as_completed(future_to_url):

url = future_to_url[future]

try:

data = future.result()

except Exception as exc:

print("%r generated an exception: %s" % (url, exc))

else:

print("%r page is %d bytes" % (url, len(data)))

常用方法二:tornado中使用@run_on_executor注解

class NoBlockingHnadler(tornado.web.RequestHandler):

executor = ThreadPoolExecutor(4)

@run_on_executor

def sleep(self, second):

time.sleep(second)

return second

@gen.coroutine

def get(self):

second = yield self.sleep(5)

self.write("noBlocking Request: {}".format(second))

原理

ThreadPoolExecutor定义了一个队列,每次将提交的任务put到一个队列中,然后执行调整线程数的方法_adjust_thread_count,从队列中取出worker,然后启动多线程执行worker

class ThreadPoolExecutor(_base.Executor):

def __init__(self, max_workers=None):

"""Initializes a new ThreadPoolExecutor instance.

Args:

max_workers: The maximum number of threads that can be used to

execute the given calls.

"""

if max_workers is None:

# Use this number because ThreadPoolExecutor is often

# used to overlap I/O instead of CPU work.

max_workers = (os.cpu_count() or 1) * 5

if max_workers <= 0:

raise ValueError("max_workers must be greater than 0")

self._max_workers = max_workers

self._work_queue = queue.Queue()

self._threads = set()

self._shutdown = False

self._shutdown_lock = threading.Lock()

def submit(self, fn, *args, **kwargs):

with self._shutdown_lock:

if self._shutdown:

raise RuntimeError("cannot schedule new futures after shutdown")

f = _base.Future()

w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)

self._adjust_thread_count()

return f

submit.__doc__ = _base.Executor.submit.__doc__

def _adjust_thread_count(self):

# When the executor gets lost, the weakref callback will wake up

# the worker threads.

def weakref_cb(_, q=self._work_queue):

q.put(None)

# TODO(bquinlan): Should avoid creating new threads if there are more

# idle threads than items in the work queue.

if len(self._threads) < self._max_workers:

t = threading.Thread(target=_worker,

args=(weakref.ref(self, weakref_cb),

self._work_queue))

t.daemon = True

t.start()

self._threads.add(t)

_threads_queues[t] = self._work_queue

def shutdown(self, wait=True):

with self._shutdown_lock:

self._shutdown = True

self._work_queue.put(None)

if wait:

for t in self._threads:

t.join()

shutdown.__doc__ = _base.Executor.shutdown.__doc__

参考

https://docs.python.org/zh-cn/3/library/concurrent.futures.html

https://juejin.im/post/5cf913cfe51d45105d63a4d0

https://hexiangyu.me/2017/01/29/real-tornado-async-noblocking/

以上是 Python3线程池ThreadPoolExecutor总结 的全部内容, 来源链接: utcz.com/z/514927.html

回到顶部