Python3线程池ThreadPoolExecutor总结
是什么
ThreadPoolExecutor
是 Executor
的子类,它使用线程池来异步执行调用。
如何用
常用方法一:with ThreadPoolExecutor as t: t.submit
import concurrent.futuresimport urllib.request
URLS = ["http://www.foxnews.com/",
"http://www.cnn.com/",
"http://europe.wsj.com/",
"http://www.bbc.co.uk/",
"http://some-made-up-domain.com/"]
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print("%r generated an exception: %s" % (url, exc))
else:
print("%r page is %d bytes" % (url, len(data)))
常用方法二:tornado中使用@run_on_executor注解
class NoBlockingHnadler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(4)
@run_on_executor
def sleep(self, second):
time.sleep(second)
return second
@gen.coroutine
def get(self):
second = yield self.sleep(5)
self.write("noBlocking Request: {}".format(second))
原理
ThreadPoolExecutor定义了一个队列,每次将提交的任务put到一个队列中,然后执行调整线程数的方法_adjust_thread_count,从队列中取出worker,然后启动多线程执行worker
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError("cannot schedule new futures after shutdown")
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
参考
https://docs.python.org/zh-cn/3/library/concurrent.futures.html
https://juejin.im/post/5cf913cfe51d45105d63a4d0
https://hexiangyu.me/2017/01/29/real-tornado-async-noblocking/
以上是 Python3线程池ThreadPoolExecutor总结 的全部内容, 来源链接: utcz.com/z/514927.html