queue ---同步队列类

源代码: Lib/queue.py


这个 queue 模块实现多生产者、多消费者队列。当必须在多个线程之间安全地交换信息时,它在线程编程中特别有用。这个 Queue 该模块中的类实现了所有必需的锁语义。

该模块实现三种类型的队列,这三种类型的队列仅在检索条目的顺序上有所不同。在一个 FIFO 队列中,第一个添加的任务是第一个检索到的。在一个 LIFO 队列,最近添加的条目是第一个被检索的条目(像堆栈一样操作)。对于优先级队列,条目将保持排序(使用 heapq 模块),首先检索最低值的条目。

在内部,这三种类型的队列使用锁来暂时阻塞竞争线程;但是,它们并不是为处理线程内的可重入性而设计的。

此外,该模块实现了一个“简单” FIFO 队列类型, SimpleQueue 它的具体实现提供了额外的保证,以换取较小的功能。

这个 queue 模块定义以下类和异常:

class queue.Queue(maxsize=0)

的构造函数 FIFO 排队。 最大尺寸 是一个整数,它设置可放入队列的项目数的上限。一旦达到这个大小,插入将被阻塞,直到队列项被占用。如果 最大尺寸 小于或等于零,队列大小为无穷大。

class queue.LifoQueue(maxsize=0)

的构造函数 LIFO 排队。 最大尺寸 是一个整数,它设置可放入队列的项目数的上限。一旦达到这个大小,插入将被阻塞,直到队列项被占用。如果 最大尺寸 小于或等于零,队列大小为无穷大。

class queue.PriorityQueue(maxsize=0)

优先级队列的构造函数。 最大尺寸 是一个整数,它设置可放入队列的项目数的上限。一旦达到这个大小,插入将被阻塞,直到队列项被占用。如果 最大尺寸 小于或等于零,队列大小为无穷大。

首先检索最低值的条目(最低值的条目是 sorted(list(entries))[0] )条目的典型模式是一个tuple形式: (priority_number, data) .

如果 data 元素不可比较,可以将数据封装在忽略数据项并只比较优先级编号的类中:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

无边界的构造函数 FIFO 排队。简单队列缺少高级功能,如任务跟踪。

3.7 新版功能.

exception queue.Empty

非阻塞时引发异常 get() (或) get_nowait() )在上调用 Queue 对象为空。

exception queue.Full

非阻塞时引发异常 put() (或) put_nowait() )在上调用 Queue 对象已满。

队列对象

队列对象 (QueueLifoQueuePriorityQueue )提供下面描述的公共方法。

Queue.qsize()

返回队列的大致大小。注意,qsize()>0不能保证随后的get()不会阻塞,qsize()<maxSize也不能保证put()不会阻塞。

Queue.empty()

返回 True 如果队列为空, False 否则。if empty()返回 True 它不能保证后续的put()调用不会阻塞。类似地,if empty()返回 False 它不能保证随后对get()的调用不会阻塞。

Queue.full()

返回 True 如果队列已满, False 否则。if full()返回 True 它不能保证随后对get()的调用不会阻塞。类似地,if full()返回 False 它不能保证后续的put()调用不会阻塞。

Queue.put(item, block=True, timeout=None)

item 进入队列。如果可选参数 是真的 timeoutNone (默认),必要时阻止,直到可用插槽。如果 timeout 是正数,最多阻塞 timeout 秒,然后提高 Full 如果在此时间内没有可用插槽,则为例外。否则( 如果可用插槽立即可用,则将项目放入队列,否则引发 Full 例外(例外) timeout 在这种情况下被忽略)。

Queue.put_nowait(item)

相当于 put(item, False) .

Queue.get(block=True, timeout=None)

从队列中移除并返回项目。如果可选参数 是真的 timeoutNone (默认),必要时阻止,直到项目可用。如果 timeout 是正数,最多阻塞 timeout 秒,然后提高 Empty 如果在该时间内没有可用的项目,则为例外。否则( 如果一项立即可用,则返回该项,否则将引发 Empty 例外(例外) timeout 在这种情况下被忽略)。

在POSIX系统上是3.0之前的版本,对于Windows上的所有版本,如果 是真的 超时None ,此操作将进入对基础锁的不间断等待。这意味着不会发生异常,尤其是sigint不会触发 KeyboardInterrupt .

Queue.get_nowait()

相当于 get(False) .

提供了两种方法来支持跟踪排队的任务是否已被守护进程使用者线程完全处理。

Queue.task_done()

指示以前排队的任务已完成。由队列使用者线程使用。对于每一个 get() 用于获取任务,随后调用 task_done() 告诉队列任务的处理已完成。

如果A join() 当前正在阻止,它将在处理完所有项目后恢复(意味着 task_done() 每一件物品都接到了调用 put() 进入队列)。

提出一个 ValueError 如果调用次数超过了队列中放置的项目数。

Queue.join()

在获取和处理队列中的所有项之前阻止。

每当将项目添加到队列时,未完成任务的计数就会增加。每当使用者线程调用时,计数就会下降 task_done() 以指示已检索到该项,并且对其进行的所有工作都已完成。当未完成任务的计数降至零时, join() 解除阻塞。

如何等待排队的任务完成的示例:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()

# send thirty task requests to the worker
for item in range(30):
    q.put(item)
print('All task requests sent\n', end='')

# block until all tasks are done
q.join()
print('All work completed')

SimpleQueue对象

SimpleQueue 对象提供下面描述的公共方法。

SimpleQueue.qsize()

返回队列的大致大小。注意,qsize()>0不保证后续get()不会阻塞。

SimpleQueue.empty()

返回 True 如果队列为空, False 否则。if empty()返回 False 它不能保证随后对get()的调用不会阻塞。

SimpleQueue.put(item, block=True, timeout=None)

item 进入队列。该方法从不阻塞并且总是成功(除了潜在的低级错误,如分配内存失败)。可选参数 timeout 被忽略,只提供与 Queue.put() .

CPython implementation detail: This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.

SimpleQueue.put_nowait(item)

相当于 put(item) ,提供与 Queue.put_nowait() .

SimpleQueue.get(block=True, timeout=None)

从队列中移除并返回项目。如果可选参数 是真的 timeoutNone (默认),必要时阻止,直到项目可用。如果 timeout 是正数,最多阻塞 timeout 秒,然后提高 Empty 如果在该时间内没有可用的项目,则为例外。否则( 如果一项立即可用,则返回该项,否则将引发 Empty 例外(例外) timeout 在这种情况下被忽略)。

SimpleQueue.get_nowait()

相当于 get(False) .

参见

等级 multiprocessing.Queue

用于多处理(而不是多线程)上下文的队列类。

collections.deque 是具有快速原子的无边界队列的替代实现 append()popleft() 不需要锁定也支持索引的操作。