队列

源代码: Lib/asyncio/queues.py


异步队列设计为类似于 queue 模块。尽管Asyncio队列不是线程安全的,但它们被设计为专门用于异步/等待代码中。

注意,异步队列的方法没有 timeout 参数;使用 asyncio.wait_for() 函数以超时方式执行队列操作。

也见 Examples 下面部分。

排队

class asyncio.Queue(maxsize=0)

先进先出(FIFO)队列。

如果 最大尺寸 小于或等于零,队列大小为无穷大。如果是大于的整数 0 然后 await put() 队列到达时阻塞 最大尺寸 直到项目被删除 get() .

与标准库线程不同 queue ,队列的大小始终是已知的,可以通过调用 qsize() 方法。

这个类是 not thread safe .

maxsize

队列中允许的项目数。

empty()

返回 True 如果队列为空, False 否则。

full()

返回 True 如果有 maxsize 队列中的项目。

如果队列是用初始化的 maxsize=0 (默认),然后 full() 永不回来 True .

coroutine get()

从队列中移除并返回项目。如果队列为空,请等待项目可用。

get_nowait()

如果一个项目立即可用,则返回该项目,否则引发 QueueEmpty .

coroutine join()

阻止,直到队列中的所有项目都已接收和处理。

每当将项目添加到队列时,未完成任务的计数就会增加。每当消费者的协同程序调用时,计数就会下降。 task_done() 以指示已检索到该项,并且对其进行的所有工作都已完成。当未完成任务的计数降至零时, join() 解除阻塞。

coroutine put(item)

将项目放入队列。如果队列已满,请等到空闲插槽可用后再添加项目。

put_nowait(item)

将项目放入队列而不阻塞。

如果没有可用插槽,则升高 QueueFull .

qsize()

返回队列中的项目数。

task_done()

指示以前排队的任务已完成。

由队列使用者使用。对于每一个 get() 用于获取任务,随后调用 task_done() 告诉队列任务的处理已完成。

如果A join() 当前正在阻止,它将在处理完所有项目后恢复(意味着 task_done() 每一件物品都接到了调用 put() 进入队列)。

引发 ValueError 如果调用次数超过了队列中放置的项目数。

优先队列

class asyncio.PriorityQueue

一种变体 Queue ;按优先级检索条目(最低优先级优先)。

条目通常是窗体的元组 (priority_number, data) .

后进先出队列

class asyncio.LifoQueue

一种变体 Queue 首先检索最近添加的项(后进先出)。

例外情况

exception asyncio.QueueEmpty

get_nowait() 方法是在空队列上调用的。

exception asyncio.QueueFull

put_nowait() 方法在已到达其 最大尺寸 .

实例

队列可用于在多个并发任务之间分配工作负载::

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())