协同程序

协同程序 是在Tornado中编写异步代码的推荐方法。协同作战使用 Python awaityield 用于挂起和恢复执行的关键字,而不是回调链(如框架中所示的协作轻量级线程 gevent 有时也被称为协程,但在Tornado中,所有协程都使用显式上下文切换,并被称为异步函数)。

协程几乎和同步代码一样简单,但不需要花费线程。他们也 make concurrency easier 通过减少上下文切换可能发生的位置的数量来进行推理。

例子::

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

本土与装饰过的紧身连衣裤

python 3.5引入了 asyncawait 关键字(使用这些关键字的函数也称为“本机协程”)。为了与旧版本的Python兼容,可以使用 tornado.gen.coroutine 装饰者。

只要可能,推荐使用本地协程。只有在需要与旧版本的Python兼容时才使用装饰好的协程。Tornado文档中的示例通常使用本机形式。

两种形式之间的翻译通常很简单:

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

下面概述了这两种形式的协同作用之间的其他差异。

  • 本地协程:

    • 通常更快。

    • 可以使用 async forasync with 使某些模式更简单的语句。

    • 除非你 awaityield 他们。装饰过的协同程序一被调用就可以“在后台”运行。请注意,对于这两种协程,使用 awaityield 所以任何例外都有可能发生。

  • 装饰的连体衣:

    • concurrent.futures 包,允许结果 executor.submit 直接交出。对于本地协同训练,请使用 IOLoop.run_in_executor 相反。

    • 通过生成一个列表或dict,支持等待多个对象的一些速记。使用 tornado.gen.multi 在本地协程中执行此操作。

    • 可以支持与其他包的集成,包括通过转换函数注册表进行Twisted。要在本机协程中访问此功能,请使用 tornado.gen.convert_yielded .

    • 总是返回 Future 对象。本地协程返回 可期待的 对象不是 Future . 在龙卷风中,两者大部分可以互换。

它是如何工作的

本节介绍装饰协程的操作。本机协程在概念上类似,但由于与Python运行时的额外集成,因此稍微复杂一些。

包含 yield 是一个 生成器 . 所有生成器都是异步的;当调用它们时,它们返回一个生成器对象,而不是运行到完成。这个 @gen.coroutine 装饰器通过 yield 表达式,并通过返回 Future .

下面是coroutine decorator内部循环的简化版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

装饰师收到 Future 从发电机,等待(不阻塞) Future 完成,然后“打开” Future 并将结果作为 yield 表达式。大多数异步代码从不涉及 Future 直接类,除非立即通过 Future 由异步函数返回到 yield 表达式。

如何称呼连体衣

协同程序不会以正常方式引发异常:它们引发的任何异常都将被捕获在等待的对象中,直到生成异常为止。这意味着以正确的方式调用协程是很重要的,否则您可能会发现错误:

async def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

在几乎所有情况下,调用协程的任何函数都必须是协程本身,并使用 awaityield 调用中的关键字。当您重写在超类中定义的方法时,请参考文档以查看是否允许协程(文档应该说该方法“可能是协程”或“可能返回 Future “:”:

async def good_call():
    # await will unwrap the object returned by divide() and raise
    # the exception.
    await divide(1, 0)

有时你可能想“发射并忘记”协同程序而不等待其结果。在这种情况下,建议使用 IOLoop.spawn_callback ,这使得 IOLoop 负责电话。如果失败, IOLoop 将记录堆栈跟踪:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

使用 IOLoop.spawn_callback 以这种方式 推荐 对于函数,使用 @gen.coroutine ,但它是 必修的 对于函数,使用 async def (否则协程跑步者将无法启动)。

最后,在程序的顶层, 如果IOLoop还没有运行, 你可以开始 IOLoop 运行协同程序,然后停止 IOLoopIOLoop.run_sync 方法。这通常用于启动 main 面向批处理程序的功能:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

协同作用模式

调用阻塞函数

从协程调用阻塞函数的最简单方法是使用 IOLoop.run_in_executor ,返回 Futures 与协同程序兼容的:

async def call_blocking():
    await IOLoop.current().run_in_executor(None, blocking_func, args)

平行性

这个 multi 函数接受值为 Futures 然后等待所有这些 Futures 并行地:

from tornado.gen import multi

async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])

async def parallel_fetch_many(urls):
    responses = await multi ([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order

async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

在装饰过的连体衣中,可以 yield 直接列出或听写:

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

交错

有时保存 Future 而不是立即屈服,因此您可以在等待之前启动另一个操作。

from tornado.gen import convert_yielded

async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        yield self.flush()

这对装饰过的连体衣有点简单,因为它们在调用时会立即启动:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

循环

在本地协程中, async for 可以使用。在旧版本的python中,循环对于协程来说很棘手,因为没有办法 yield 每次迭代 forwhile 循环并捕获产量的结果。相反,您需要将循环条件从访问结果中分离出来,如本例中的 Motor ::

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在后台运行

PeriodicCallback 通常不与协程一起使用。相反,协程可以包含 while True: 循环与使用 tornado.gen.sleep ::

async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有时可能需要更复杂的循环。例如,前一个循环每隔 60+N 秒,在哪里 N 运行时间是 do_something() . 要每60秒运行一次,请使用上面的交错模式:

async def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt             # Wait for the timer to run out.