python asyncio 并发 队列(一)

python

import asyncio, threading

import aiohttp

class Tasks():

def __init__(self, max_async, loop=None):

self.loop = loop or asyncio.get_event_loop()

self._queue = asyncio.Queue(maxsize=100, loop=self.loop)

self.lock = threading.Lock()

self.max_async = max_async

self.work_list = []

async def geturldata(self, url):

print(url)

try:

async with aiohttp.ClientSession(loop=self.loop, conn_timeout=1) as s:

resp = await s.get(url)

result = await resp.read()

print(result.decode('utf-8'))

except Exception as e:

print(e)

async def run(self):

#ensure_future将协程转换成任务,并投递到事件循环

works = [asyncio.ensure_future(self.work(), loop=self.loop) for _ in range(self.max_async)]

self.work_list.extend(works)

await self._queue.join()

print('all tasks done')

for w in works:

w.cancel() #任务并不会立刻变为取消的状态,而是要等到下次的事件循环

async def work(self):

try:

while True:

url = await self._queue.get()

await self.geturldata(url)

self._queue.task_done()

except asyncio.CancelledError: pass

def addtask(self, item):

with self.lock:

self._queue.put_nowait(item)

@property

def count(self):

return self._queue.qsize()

def printstatus(self):

for w in self.work_list:

print(w.done())

t = Tasks(max_async=10)

url = 'https://www.baidu.com'

for u in range(100):

t.addtask(url)

print(t.count)

loop = asyncio.get_event_loop()

loop.run_until_complete(t.run())

t.printstatus()

loop.close()

以上是 python asyncio 并发 队列(一) 的全部内容, 来源链接: utcz.com/z/387301.html

回到顶部