python asyncio 并发 队列(一)
import asyncio, threadingimport aiohttp
class Tasks():
def __init__(self, max_async, loop=None):
self.loop = loop or asyncio.get_event_loop()
self._queue = asyncio.Queue(maxsize=100, loop=self.loop)
self.lock = threading.Lock()
self.max_async = max_async
self.work_list = []
async def geturldata(self, url):
print(url)
try:
async with aiohttp.ClientSession(loop=self.loop, conn_timeout=1) as s:
resp = await s.get(url)
result = await resp.read()
print(result.decode('utf-8'))
except Exception as e:
print(e)
async def run(self):
#ensure_future将协程转换成任务,并投递到事件循环
works = [asyncio.ensure_future(self.work(), loop=self.loop) for _ in range(self.max_async)]
self.work_list.extend(works)
await self._queue.join()
print('all tasks done')
for w in works:
w.cancel() #任务并不会立刻变为取消的状态,而是要等到下次的事件循环
async def work(self):
try:
while True:
url = await self._queue.get()
await self.geturldata(url)
self._queue.task_done()
except asyncio.CancelledError: pass
def addtask(self, item):
with self.lock:
self._queue.put_nowait(item)
@property
def count(self):
return self._queue.qsize()
def printstatus(self):
for w in self.work_list:
print(w.done())
t = Tasks(max_async=10)
url = 'https://www.baidu.com'
for u in range(100):
t.addtask(url)
print(t.count)
loop = asyncio.get_event_loop()
loop.run_until_complete(t.run())
t.printstatus()
loop.close()
以上是 python asyncio 并发 队列(一) 的全部内容, 来源链接: utcz.com/z/387301.html