tornado.queues --协程队列

4.2 新版功能.

协程的异步队列。这些类与标准库中提供的类非常相似 asyncio package .

警告

与标准库不同 queue 模块,这里定义的类是 not 线程安全。要使用来自其他线程的这些队列,请使用 IOLoop.add_callback 将控制权转移到 IOLoop 在调用任何队列方法之前线程。

Classes

Queue

class tornado.queues.Queue(maxsize: int = 0)[源代码]

协调生产商和消费者协作。

如果maxsize为0(默认值),则队列大小不受限制。

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    for item in range(5):
        await q.put(item)
        print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在没有本机协程的Python版本中(在3.5之前) consumer() 可以写成:

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

在 4.3 版更改: 补充 async for 支持python 3.5。

property maxsize: int

队列中允许的项目数。

qsize() int[源代码]

队列中的项目数。

put(item: tornado.queues._T, timeout: Optional[Union[float, datetime.timedelta]] = None) Future[None][源代码]

把一件物品放进队列,也许等到有空的时候。

返回一个未来,这将提高 tornado.util.TimeoutError 超时之后。

timeout 可以是表示时间的数字(与 tornado.ioloop.IOLoop.time ,正常情况下 time.timedatetime.timedelta 对象,用于相对于当前时间的截止时间。

put_nowait(item: tornado.queues._T) None[源代码]

将项目放入队列而不阻塞。

如果没有可用插槽,则升高 QueueFull .

get(timeout: Optional[Union[float, datetime.timedelta]] = None) Awaitable[tornado.queues._T][源代码]

从队列中移除并返回项目。

返回在项可用或引发后解析的可等待项 tornado.util.TimeoutError 超时之后。

timeout 可以是表示时间的数字(与 tornado.ioloop.IOLoop.time ,正常情况下 time.timedatetime.timedelta 对象,用于相对于当前时间的截止时间。

注解

这个 timeout 此方法的参数与标准库的参数不同 queue.Queue.get . 该方法将数值解释为相对超时;该方法将其解释为绝对截止日期,并要求 timedelta 用于相对超时的对象(与Tornado中的其他超时一致)。

get_nowait() tornado.queues._T[源代码]

从队列中移除并返回一个项目而不阻塞。

如果一个项目立即可用,则返回该项目,否则提升 QueueEmpty .

task_done() None[源代码]

指示以前排队的任务已完成。

由队列使用者使用。对于每一个 get 用于获取任务,随后调用 task_done 告诉队列任务的处理已完成。

如果A join 正在阻塞,当所有项都已处理完时,它将恢复;也就是说,当 put 与A匹配 task_done .

加薪 ValueError 如果被叫次数超过 put .

join(timeout: Optional[Union[float, datetime.timedelta]] = None) Awaitable[None][源代码]

阻止,直到处理队列中的所有项目。

返回一个可等待的,这将提高 tornado.util.TimeoutError 超时之后。

PriorityQueue

class tornado.queues.PriorityQueue(maxsize: int = 0)[源代码]

A Queue 它按优先级顺序检索条目,最低优先级优先。

条目通常是类似tuple的 (priority number, data) .

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize: int = 0)[源代码]

A Queue 它首先检索最近放置的项。

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
1
2
3

例外情况

QueueEmpty

exception tornado.queues.QueueEmpty[源代码]

由提高 Queue.get_nowait 当队列没有项目时。

QueueFull

exception tornado.queues.QueueFull[源代码]

由提高 Queue.put_nowait 当队列达到其最大大小时。