队列¶
异步队列设计为类似于 queue
模块。尽管Asyncio队列不是线程安全的,但它们被设计为专门用于异步/等待代码中。
注意,异步队列的方法没有 timeout 参数;使用 asyncio.wait_for()
函数以超时方式执行队列操作。
也见 Examples 下面部分。
排队¶
- class asyncio.Queue(maxsize=0)¶
先进先出(FIFO)队列。
如果 最大尺寸 小于或等于零,队列大小为无穷大。如果是大于的整数
0
然后await put()
队列到达时阻塞 最大尺寸 直到项目被删除get()
.与标准库线程不同
queue
,队列的大小始终是已知的,可以通过调用qsize()
方法。这个类是 not thread safe .
- maxsize¶
队列中允许的项目数。
- empty()¶
返回
True
如果队列为空,False
否则。
- coroutine get()¶
从队列中移除并返回项目。如果队列为空,请等待项目可用。
- get_nowait()¶
如果一个项目立即可用,则返回该项目,否则引发
QueueEmpty
.
- coroutine join()¶
阻止,直到队列中的所有项目都已接收和处理。
每当将项目添加到队列时,未完成任务的计数就会增加。每当消费者的协同程序调用时,计数就会下降。
task_done()
以指示已检索到该项,并且对其进行的所有工作都已完成。当未完成任务的计数降至零时,join()
解除阻塞。
- coroutine put(item)¶
将项目放入队列。如果队列已满,请等到空闲插槽可用后再添加项目。
- qsize()¶
返回队列中的项目数。
- task_done()¶
指示以前排队的任务已完成。
由队列使用者使用。对于每一个
get()
用于获取任务,随后调用task_done()
告诉队列任务的处理已完成。如果A
join()
当前正在阻止,它将在处理完所有项目后恢复(意味着task_done()
每一件物品都接到了调用put()
进入队列)。引发
ValueError
如果调用次数超过了队列中放置的项目数。
优先队列¶
后进先出队列¶
例外情况¶
- exception asyncio.QueueEmpty¶
当
get_nowait()
方法是在空队列上调用的。
- exception asyncio.QueueFull¶
当
put_nowait()
方法在已到达其 最大尺寸 .
实例¶
队列可用于在多个并发任务之间分配工作负载::
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())