Python队列集
asyncio 队列被设计成与 queue
模块类似。尽管 asyncio队列不是线程安全的,但是他们是被设计专用于 async/await 代码。
注意asyncio 的队列没有 timeout 形参;请使用 asyncio.wait_for()
函数为队列添加超时操作。
参见下面的 Examples 部分。
队列¶
class
asyncio.
Queue
(maxsize=0, *, loop=None)¶先进,先出(FIFO)队列
如果 maxsize 小于等于零,则队列尺寸是无限的。如果是大于
0
的整数,则当队列达到 maxsize 时,awaitput()
将阻塞至某个元素被get()
取出。不像标准库中的并发型
queue
,队列的尺寸一直是已知的,可以通过调用qsize()
方法返回。这个类不是线程安全的(not thread safe)。
maxsize
¶队列中可存放的元素数量。
empty
()¶如果队列为空返回
True
,否则返回False
。
full
()¶如果有
maxsize
个条目在队列中,则返回True
。如果队列用
maxsize=0
(默认)初始化,则full()
永远不会返回True
。
coroutine
get
()¶从队列中删除并返回一个元素。如果队列为空,则等待,直到队列中有元素。
get_nowait
()¶立即返回一个队列中的元素,如果队列内有值,否则引发异常
QueueEmpty
。
coroutine
join
()¶阻塞至队列中所有的元素都被接收和处理完毕。
当条目添加到队列的时候,未完成任务的计数就会增加。每当消费协程调用
task_done()
表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候,join()
阻塞被解除。
coroutine
put
(item)¶添加一个元素进队列。如果队列满了,在添加元素之前,会一直等待空闲插槽可用。
put_nowait
(item)¶不阻塞的放一个元素入队列。
如果没有立即可用的空闲槽,引发
QueueFull
异常。
qsize
()¶返回队列用的元素数量。
task_done
()¶表明前面排队的任务已经完成,即get出来的元素相关操作已经完成。
由队列使用者控制。每个
get()
用于获取一个任务,任务最后调用task_done()
告诉队列,这个任务已经完成。如果
join()
当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个put()
进队列的条目的task_done()
都被收到)。如果被调用的次数多于放入队列中的项目数量,将引发
ValueError
。
优先级队列¶
class
asyncio.
PriorityQueue
¶Queue
的变体;按优先级顺序取出条目 (最小的先取出)。条目通常是
(priority_number,data)
形式的元组。
后进先出队列¶
class
asyncio.
LifoQueue
¶Queue
的变体,先取出最近添加的条目(后进,先出)。
异常¶
exception
asyncio.
QueueEmpty
¶当队列为空的时候,调用
get_nowait()
方法而引发这个异常。
exception
asyncio.
QueueFull
¶当队列中条目数量已经达到它的 maxsize 的时候,调用
put_nowait()
方法而引发的异常。
例子¶
队列能被用于多个的并发任务的工作量分配:
importasyncioimportrandom
importtime
asyncdefworker(name,queue):
whileTrue:
# Get a "work item" out of the queue.
sleep_for=awaitqueue.get()
# Sleep for the "sleep_for" seconds.
awaitasyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
asyncdefmain():
# Create a queue that we will use to store our "workload".
queue=asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time=0
for_inrange(20):
sleep_for=random.uniform(0.05,1.0)
total_sleep_time+=sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks=[]
foriinrange(3):
task=asyncio.create_task(worker(f'worker-{i}',queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at=time.monotonic()
awaitqueue.join()
total_slept_for=time.monotonic()-started_at
# Cancel our worker tasks.
fortaskintasks:
task.cancel()
# Wait until all worker tasks are cancelled.
awaitasyncio.gather(*tasks,return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
以上是 Python队列集 的全部内容, 来源链接: utcz.com/z/508342.html