tornado.locks --同步原语

4.2 新版功能.

使用类似于标准库提供给线程的同步原语协调协同程序。这些类与标准库中提供的类非常相似 asyncio package .

警告

请注意,这些原语实际上不是线程安全的,不能代替标准库中的原语。 threading 模块——它们是为了在单线程应用程序中协调Tornado协程,而不是为了保护多线程应用程序中的共享对象。

条件

class tornado.locks.Condition[源代码]

一个条件允许一个或多个协程等待通知。

像一个标准 threading.Condition ,但不需要获取和释放的基础锁。

用一个 Condition ,协程可以等待其他协程通知:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition

condition = Condition()

async def waiter():
    print("I'll wait right here")
    await condition.wait()
    print("I'm done waiting")

async def notifier():
    print("About to notify")
    condition.notify()
    print("Done notifying")

async def runner():
    # Wait for waiter() and notifier() in parallel
    await gen.multi([waiter(), notifier()])

IOLoop.current().run_sync(runner)
I'll wait right here
About to notify
Done notifying
I'm done waiting

wait 选择一个选项 timeout 参数,它是绝对时间戳::

io_loop = IOLoop.current()

# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)

或… datetime.timedelta 对于相对于当前时间的超时:

# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))

如果在截止日期之前没有通知,则该方法返回false。

在 5.0 版更改: 以前,可以从内部同步通知服务员 notify . 现在,通知将始终在 IOLoop .

wait(timeout: Optional[Union[float, datetime.timedelta]] = None) Awaitable[bool][源代码]

等待 notify .

返回A Future 解决了 True 如果情况得到通知,或 False 超时之后。

notify(n: int = 1) None[源代码]

尾流 n 服务员。

notify_all() None[源代码]

叫醒所有服务员。

事件

class tornado.locks.Event[源代码]

事件将阻止协同程序,直到其内部标志设置为true。

类似 threading.Event .

协同程序可以等待设置事件。设置好后,调用 yield event.wait() 除非事件被清除,否则不会阻止:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event

event = Event()

async def waiter():
    print("Waiting for event")
    await event.wait()
    print("Not waiting this time")
    await event.wait()
    print("Done")

async def setter():
    print("About to set the event")
    event.set()

async def runner():
    await gen.multi([waiter(), setter()])

IOLoop.current().run_sync(runner)
Waiting for event
About to set the event
Not waiting this time
Done
is_set() bool[源代码]

返回 True 如果内部标志为真。

set() None[源代码]

将内部标志设置为 True . 所有的服务员都醒了。

调用 wait 一旦设置了标志,就不会阻塞。

clear() None[源代码]

将内部标志重置为 False .

呼叫 wait 将阻止到 set 被称为。

wait(timeout: Optional[Union[float, datetime.timedelta]] = None) Awaitable[None][源代码]

阻止,直到内部标志为真。

返回一个可等待的,这将提高 tornado.util.TimeoutError 超时之后。

Semaphore

class tornado.locks.Semaphore(value: int = 1)[源代码]

一种锁,可以在阻塞前获得固定次数。

信号量管理表示 release 呼叫数减去 acquire 调用,加上初始值。这个 acquire 如果需要,方法将阻塞,直到它可以返回而不使计数器为负。

信号量限制对共享资源的访问。允许两名工人同时进入:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore

sem = Semaphore(2)

async def worker(worker_id):
    await sem.acquire()
    try:
        print("Worker %d is working" % worker_id)
        await use_some_resource()
    finally:
        print("Worker %d is done" % worker_id)
        sem.release()

async def runner():
    # Join all workers.
    await gen.multi([worker(i) for i in range(3)])

IOLoop.current().run_sync(runner)
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done

允许工作进程0和1同时运行,但工作进程2等待直到工作进程0释放一次信号量。

信号量可以用作异步上下文管理器::

async def worker(worker_id):
    async with sem:
        print("Worker %d is working" % worker_id)
        await use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

为了与旧版本的python兼容, acquire 是一个上下文管理器,所以 worker 也可以写为:

@gen.coroutine
def worker(worker_id):
    with (yield sem.acquire()):
        print("Worker %d is working" % worker_id)
        yield use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

在 4.3 版更改: 补充 async with 支持python 3.5。

release() None[源代码]

增加柜台,叫醒一个服务员。

acquire(timeout: Optional[Union[float, datetime.timedelta]] = None) Awaitable[tornado.locks._ReleasingContextManager][源代码]

减小计数器。返回一个可等待的。

如果计数器为零,则阻塞并等待 release . 等待的加薪 TimeoutError 在最后期限之后。

BoundedSemaphore

class tornado.locks.BoundedSemaphore(value: int = 1)[源代码]

阻止多次调用release()的信号量。

如果 release 将使信号量的值超过初始值,它将 ValueError . 信号量主要用于保护容量有限的资源,因此多次释放的信号量是错误的迹象。

release() None[源代码]

增加柜台,叫醒一个服务员。

acquire(timeout: Optional[Union[float, datetime.timedelta]] = None) Awaitable[tornado.locks._ReleasingContextManager]

减小计数器。返回一个可等待的。

如果计数器为零,则阻塞并等待 release . 等待的加薪 TimeoutError 在最后期限之后。

class tornado.locks.Lock[源代码]

连体衣锁。

锁开始解锁,并且 acquire 立即锁定。当它被锁定时,一个可以产生 acquire 等到另一个协程调用 release .

释放未锁定的锁升起 RuntimeError .

锁可以用作异步上下文管理器, async with 声明:

>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
...    async with lock:
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

为了与旧版本的python兼容, acquire 方法异步返回常规上下文管理器:

>>> async def f2():
...    with (yield lock.acquire()):
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

在 4.3 版更改: 补充 async with 支持python 3.5。

acquire(timeout: Optional[Union[float, datetime.timedelta]] = None) Awaitable[tornado.locks._ReleasingContextManager][源代码]

尝试锁定。返回一个可等待的。

返回一个可等待的,这将提高 tornado.util.TimeoutError 超时之后。

release() None[源代码]

解锁。

排队等候的第一个连体衣 acquire 获取锁。

如果没有锁定,升起 RuntimeError .