协同工作和任务

本节概述用于协同工作和任务的高级异步API。

协同程序

Coroutines 使用async/await语法声明是编写异步应用程序的首选方法。例如,以下代码片段(需要Python3.7+)将打印“hello”,等待1秒,然后打印“world”:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意,简单地调用协程不会安排执行它:

>>> main()
<coroutine object main at 0x1053bb7c8>

为了实际运行协程,Asyncio提供了三种主要机制:

  • 这个 asyncio.run() 运行顶级入口点“main()”函数的函数(参见上面的示例)。

  • 在协同程序上等待。下面的代码片段将在等待1秒后打印“hello”,然后在等待之后打印“world” 另一个 2秒:

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    预期输出:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • 这个 asyncio.create_task() 函数以异步方式并发运行协程 Tasks .

    让我们修改上面的示例并运行2 say_after 协同程序 同时地 ::

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    请注意,预期的输出现在显示代码段比以前快了1秒:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    

等待者

我们说物体是 可期待的 对象,如果它可以在 await 表达式。许多异步API都是为接受等待而设计的。

有三种主要类型 可期待的 物体: awaitable任务coroutines .

协同程序

Python 协同训练是 等待者 因此,我们可以从其他协作程序中等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在本文档中,术语“协程”可用于两个密切相关的概念:

  • 协同作用 一个 async def 功能;

  • 协同线对象 :通过调用 协同作用 .

Asyncio还支持传统 generator-based 协同程序。

任务

任务 用于安排协同程序 同时地 .

当协同程序裹在 Task 具有如下功能 asyncio.create_task() 协同程序将自动安排为很快运行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

期货

A Future 是一种特殊的 low-level 表示 最终结果 异步操作的。

当未来的对象 等待 这意味着协同程序将等待未来在其他地方解决。

异步中的未来对象需要允许基于回调的代码与异步/等待一起使用。

正常地 没有必要 在应用程序级代码中创建未来对象。

未来的对象(有时由库和一些异步API公开)可以等待:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

返回未来对象的低级函数的一个好例子是 loop.run_in_executor() .

运行异步程序

asyncio.run(coro, *, debug=False)

Execute the coroutine coro and return the result.

此函数运行传递的协程,负责管理异步事件循环, 完成异步发电机 ,并关闭线程池。

当另一个Asyncio事件循环正在同一线程中运行时,无法调用此函数。

如果 调试True ,事件循环将在调试模式下运行。

此函数总是创建一个新的事件循环,并在结束时关闭它。它应该用作异步程序的主要入口点,理想情况下应该只调用一次。

例子::

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

3.7 新版功能.

在 3.9 版更改: 更新以使用 loop.shutdown_default_executor() .

注解

的源代码 asyncio.run() 可以在 Lib/asyncio/runners.py .

创建任务

asyncio.create_task(coro, *, name=None)

封装 coro coroutine 变成一个 Task 并安排执行。返回任务对象。

如果 name 不是 None ,它设置为任务的名称,使用 Task.set_name() .

任务在返回的循环中执行 get_running_loop()RuntimeError 如果当前线程中没有正在运行的循环,则引发。

这个功能已经 在python 3.7中添加 . 在python 3.7之前,底层 asyncio.ensure_future() 可以改用函数::

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

3.7 新版功能.

在 3.8 版更改: 增加了 name 参数。

睡觉

coroutine asyncio.sleep(delay, result=None)

封锁 延迟 秒。

如果 结果 当协程完成时,它将返回给调用方。

sleep() 总是挂起当前任务,允许运行其他任务。

将延迟设置为0可提供允许其他任务运行的优化路径。这可由长时间运行的函数使用,以避免在函数调用的整个持续时间内阻塞事件循环。

协程的示例显示当前日期,每秒钟显示5秒:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

同时运行任务

awaitable asyncio.gather(*aws, return_exceptions=False)

运行 awaitable objectsaws 序列 同时地 .

如果有人在等你 aws 是协同程序,它被自动安排为任务。

如果成功完成所有等待项,则结果是返回值的聚合列表。结果值的顺序与 aws .

如果 return_exceptionsFalse (默认),第一个引发的异常将立即传播到等待的任务 gather() . 其他等待中的 aws 序列 不会被取消的 并将继续运行。

如果 return_exceptionsTrue ,将异常视为成功结果,并在结果列表中进行聚合。

如果 gather()取消 ,所有提交的等待文件(尚未完成)也 取消 .

如果任何任务或未来 aws 序列是 取消 ,它被视为是升起的 CancelledError —— gather() 调用是 not 在这种情况下取消。这是为了防止取消一个提交的任务/未来导致其他任务/未来被取消。

例子::

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

注解

如果 return_exceptions 为False,在gather()标记为done后取消它不会取消任何已提交的等待项。例如,在向调用方传播异常之后,可以将聚集标记为done,因此调用 gather.cancel() 从gather捕获异常(由一个等待对象引发)后,不会取消任何其他可等待对象。

在 3.7 版更改: 如果 聚集 它本身被取消,不管 return_exceptions .

取消的屏蔽

awaitable asyncio.shield(aw)

保护一个 awaitable object 从存在 cancelled .

如果 aw 是协同程序,它被自动安排为任务。

声明:

res = await shield(something())

等于:

res = await something()

除了 如果包含它的协同程序被取消,任务将在 something() 未取消。从 something() ,取消没有发生。尽管调用方仍被取消,因此“wait”表达式仍会引发 CancelledError .

如果 something() 通过其他方式取消(即从内部取消),也会取消 shield() .

如果希望完全忽略取消(不推荐),则 shield() 函数应与try/except子句组合,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

超时

coroutine asyncio.wait_for(aw, timeout)

等待 aw awaitable 以超时完成。

如果 aw 是协同程序,它被自动安排为任务。

timeout 要么可以 None 或等待的浮点或int秒数。如果 timeoutNone ,阻止,直到将来完成。

如果发生超时,它将取消任务并引发 asyncio.TimeoutError .

为了避免任务 cancellation 把它包起来 shield() .

函数将一直等到将来被取消,因此总等待时间可能超过 超时 . 如果在取消过程中发生异常,则会传播该异常。

如果等待被取消,未来 aw 也被取消。

例子::

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

在 3.7 版更改: 什么时候? aw 由于超时而取消, wait_for 等待 aw 取消。以前,它提出 asyncio.TimeoutError 立即。

等待原语

coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

awaitable objectsaws 可并发迭代和挡路,直到由指定的条件为止 return_when

这个 aws Iterable不能为空。

返回两组任务/未来: (done, pending) .

用法:

done, pending = await asyncio.wait(aws)

timeout (float或int)如果指定,则可用于控制返回前等待的最大秒数。

请注意,此功能不会引发 asyncio.TimeoutError . 当超时发生时未完成的预购或任务只在第二组中返回。

return_when 指示此函数应何时返回。它必须是以下常量之一:

常数

描述

FIRST_COMPLETED

当任何将来完成或取消时,函数将返回。

FIRST_EXCEPTION

当将来通过引发异常而结束时,函数将返回。如果没有未来引发异常,那么它等于 ALL_COMPLETED .

ALL_COMPLETED

当所有预购完成或取消时,函数将返回。

不像 wait_for()wait() 在超时时不取消预购。

3.8 版后已移除: 如果有人在等你 aws 是协同程序,它被自动安排为任务。将协程对象传递到 wait() 直接被否决,因为它导致 confusing behavior .

注解

wait() 自动将协同程序调度为任务,稍后返回在中隐式创建的任务对象 (done, pending) 集合。因此,以下代码无法按预期工作:

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!

下面是如何修复上述代码段:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

Deprecated since version 3.8, will be removed in version 3.11: 将协程对象传递给 wait() 直接弃用。

asyncio.as_completed(aws, *, timeout=None)

awaitable objectsaws 可并发迭代。返回协程的迭代器。可以等待返回的每个协程从剩余等待项的迭代中获得最早的下一个结果。

引发 asyncio.TimeoutError 如果超时发生在所有预购完成之前。

例子::

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

磨合螺纹

coroutine asyncio.to_thread(func, /, *args, **kwargs)

异步运行函数 func 在一个单独的线程中。

任何args and * *为此函数提供的kwarg直接传递给 Func*.另外,当前的 contextvars.Context 被传播,从而允许在单独的线程中访问来自事件循环线程的上下文变量。

返回一个可以等待得到的最终结果 func .

此协程函数主要用于执行IO绑定函数/方法,否则,如果在主线程中运行这些函数/方法,它们将阻塞事件循环。例如::

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

直接呼叫 blocking_io() 在任何协程中,都会阻塞事件循环的持续时间,从而导致额外的1秒运行时间。相反,通过使用 asyncio.to_thread() ,我们可以在单独的线程中运行它,而不阻塞事件循环。

注解

由于 GILasyncio.to_thread() 通常只能用于使IO绑定函数不阻塞。但是,对于发布GIL的扩展模块或没有GIL的替代Python实现, asyncio.to_thread() 也可用于CPU绑定函数。

3.9 新版功能.

从其他线程调度

asyncio.run_coroutine_threadsafe(coro, loop)

向给定的事件循环提交协程。线程安全。

返回A concurrent.futures.Future 等待另一个OS线程的结果。

这个函数是从不同的OS线程调用的,而不是从运行事件循环的线程调用的。例子::

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果协同程序中出现异常,将通知返回的未来。它还可用于取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

concurrency and multithreading 文档的节。

与其他异步函数不同,此函数需要 loop 要显式传递的参数。

3.5.1 新版功能.

反省

asyncio.current_task(loop=None)

返回当前正在运行的 Task 实例,或 None 如果没有任务正在运行。

如果 loopNone get_running_loop() 用于获取当前循环。

3.7 新版功能.

asyncio.all_tasks(loop=None)

返回一组尚未完成的 Task 循环运行的对象。

如果 loopNoneget_running_loop() 用于获取当前循环。

3.7 新版功能.

任务对象

class asyncio.Task(coro, *, loop=None, name=None)

A Future-like 运行python的对象 coroutine . 不是线程安全的。

任务用于在事件循环中运行协程。如果协同工作在等待未来,任务将暂停协同工作的执行,并等待未来的完成。当未来是 done ,封装好的协同程序继续执行。

事件循环使用协同调度:事件循环一次运行一个任务。当任务等待将来完成时,事件循环运行其他任务、回调或执行IO操作。

使用高级 asyncio.create_task() 创建任务的功能,或低级别 loop.create_task()ensure_future() 功能。不鼓励手动实例化任务。

要取消正在运行的任务,请使用 cancel() 方法。调用它将导致任务 CancelledError 封装好的协同程序例外。如果协同程序在取消过程中等待将来的对象,则将来的对象将被取消。

cancelled() 可用于检查任务是否被取消。方法返回 True 如果包裹的协同程序没有抑制 CancelledError 异常,实际已被取消。

asyncio.Task 继承自 Future 除了 Future.set_result()Future.set_exception() .

任务支持 contextvars 模块。创建任务时,它会复制当前上下文,稍后在复制的上下文中运行其协同工作。

在 3.7 版更改: 增加了对 contextvars 模块。

在 3.8 版更改: 增加了 name 参数。

Deprecated since version 3.8, removed in version 3.10: 这个 loop 参数。

cancel(msg=None)

请求取消任务。

这安排了一个 CancelledError 将在事件循环的下一个循环中被抛出到封装好的协程中的异常。

然后协同程序有机会通过使用 try ………… except CancelledErrorfinally 阻止。因此,不同于 Future.cancel()Task.cancel() 不保证任务将被取消,尽管完全禁止取消并不常见,并且主动不鼓励。

在 3.9 版更改: 增加了 msg 参数。

下面的示例说明协程如何拦截取消请求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

返回 True 如果任务是 取消 .

任务是 取消 当要求取消时 cancel() 而被包裹的冠脉则传播了 CancelledError 抛出异常。

done()

返回 True 如果任务是 done .

任务是 done 当封装的协同程序返回值、引发异常或任务被取消时。

result()

返回任务的结果。

如果任务是 done ,将返回封装的协程的结果(或者如果协程引发了异常,则会重新引发该异常)。

如果任务 取消 ,此方法引发 CancelledError 例外。

如果任务的结果尚不可用,则此方法将引发 InvalidStateError 例外。

exception()

返回任务的异常。

如果封装的协程引发异常,则返回该异常。如果封装的协程返回正常,则此方法返回 None .

如果任务 取消 ,此方法引发 CancelledError 例外。

如果任务不是 done 然而,这种方法引发了 InvalidStateError 例外。

add_done_callback(callback, *, context=None)

添加要在任务为 done .

此方法只应在基于低级回调的代码中使用。

参见以下文件: Future.add_done_callback() 了解更多详细信息。

remove_done_callback(callback)

去除 回调 从回调列表中。

此方法只应在基于低级回调的代码中使用。

参见以下文件: Future.remove_done_callback() 了解更多详细信息。

get_stack(*, limit=None)

返回此任务的堆栈帧列表。

如果未完成封装好的协程,则返回挂起它的堆栈。如果协同工作已成功完成或被取消,则返回空列表。如果协同程序因异常终止,则返回回溯帧列表。

框架总是从最旧到最新排序。

对于挂起的协程,只返回一个堆栈帧。

可选的 limit 参数设置要返回的最大帧数;默认情况下,将返回所有可用帧。返回列表的顺序因是否返回堆栈或跟踪而不同:返回堆栈的最新帧,但返回跟踪的最旧帧。(这与回溯模块的行为相匹配。)

print_stack(*, limit=None, file=None)

打印此任务的堆栈或回溯。

这将产生与跟踪模块相似的输出,用于 get_stack() .

这个 limit 参数传递给 get_stack() 直接。

这个 file 参数是将输出写入的I/O流;默认情况下,输出写入 sys.stderr .

get_coro()

返回由 Task .

3.8 新版功能.

get_name()

返回任务的名称。

如果没有为任务显式分配名称,则默认的异步任务实现在实例化期间生成默认名称。

3.8 新版功能.

set_name(value)

设置任务的名称。

这个 value 参数可以是任何对象,然后将其转换为字符串。

在默认任务实现中,名称将在 repr() 任务对象的输出。

3.8 新版功能.

基于生成器的协同程序

注解

支持基于生成器的协程是 贬低 并计划在python 3.10中删除。

基于生成器的协程早于异步/等待语法。它们是使用 yield from 等待未来和其他协同程序的表达式。

基于生成器的协程应该用 @asyncio.coroutine 尽管这不是强制执行的。

@asyncio.coroutine

用于标记基于生成器的协程的装饰器。

此装饰器使基于遗留生成器的协程与异步/等待代码兼容:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

这个装饰器不应该用于 async def 协同程序。

Deprecated since version 3.8, removed in version 3.10: 使用 async def 相反。

asyncio.iscoroutine(obj)

返回 True 如果 obj 是一个 coroutine object .

这种方法不同于 inspect.iscoroutine() 因为它回来了 True 用于基于生成器的协同程序。

asyncio.iscoroutinefunction(func)

返回 True 如果 func 是一个 coroutine function .

这种方法不同于 inspect.iscoroutinefunction() 因为它回来了 True 用于基于生成器的协程函数 @coroutine .