异步开发

异步编程不同于传统的“顺序”编程。

此页列出常见错误和陷阱,并解释如何避免它们。

调试模式

默认情况下,Asyncio以生产模式运行。为了便于开发,Asyncio有一个 调试模式 .

有几种启用异步调试模式的方法:

除了启用调试模式外,还应考虑:

  • 设置的日志级别 asyncio loggerlogging.DEBUG 例如,可以在应用程序启动时运行以下代码片段:

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 要显示的模块 ResourceWarning 警告。一种方法是使用 -W default 命令行选项。

启用调试模式时:

  • 异步检查 coroutines that were not awaited 并记录下来;这减轻了“被遗忘的等待”的陷阱。

  • 许多非线程安全异步API(例如 loop.call_soon()loop.call_at() 方法)如果从错误的线程调用异常,则引发异常。

  • 如果执行I/O操作花费的时间太长,则会记录I/O选择器的执行时间。

  • 记录超过100毫秒的回调。这个 loop.slow_callback_duration 属性可用于设置被视为“慢”的最小执行持续时间(秒)。

并发和多线程

事件循环在线程(通常是主线程)中运行,并执行其线程中的所有回调和任务。当任务在事件循环中运行时,其他任务不能在同一线程中运行。当任务执行 await 表达式,正在运行的任务被挂起,事件循环执行下一个任务。

安排a callback 从另一个操作系统线程 loop.call_soon_threadsafe() 应使用方法。例子::

loop.call_soon_threadsafe(callback, *args)

几乎所有的Asyncio对象都不是线程安全的,这通常不是问题,除非有代码可以在任务或回调之外使用它们。如果需要这样的代码来调用低级异步API,那么 loop.call_soon_threadsafe() 应使用方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要从不同的OS线程调度协程对象,请 run_coroutine_threadsafe() 应该使用函数。它返回一个 concurrent.futures.Future 访问结果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

要处理信号和执行子进程,必须在主线程中运行事件循环。

这个 loop.run_in_executor() 方法可以与 concurrent.futures.ThreadPoolExecutor 在不同的OS线程中执行阻塞代码,而不阻塞事件循环运行的OS线程。

目前还没有办法直接从另一个进程(例如从 multiprocessing )这个 Event Loop Methods 部分列出了可以在不阻塞事件循环的情况下从管道和监视文件描述符读取的API。此外,asyncio Subprocess api提供了一种启动进程并从事件循环与其通信的方法。最后,前面提到的 loop.run_in_executor() 方法也可以与 concurrent.futures.ProcessPoolExecutor 在不同的进程中执行代码。

运行阻塞代码

不应直接调用阻塞(CPU绑定)代码。例如,如果一个函数执行CPU密集型计算1秒,那么所有并发的异步任务和IO操作都将延迟1秒。

执行器可以用于在不同的线程中甚至在不同的进程中运行任务,以避免用事件循环阻塞OS线程。见 loop.run_in_executor() 方法获取更多详细信息。

登录

Asyncio使用 logging 模块和所有日志记录通过 "asyncio" 记录器。

默认日志级别为 logging.INFO ,可轻松调整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

检测从不等待的协同程序

当调用协程函数但不等待时(例如 coro() 而不是 await coro() )或者协程没有安排 asyncio.create_task() ,Asyncio将发出 RuntimeWarning ::

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

调试模式下的输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的解决方法是等待协同程序或调用 asyncio.create_task() 功能:

async def main():
    await test()

检测从未检索到的异常

如果A Future.set_exception() 调用了,但不等待将来的对象,该异常将永远不会传播到用户代码。在这种情况下,当将来的对象被垃圾收集时,Asyncio将发出一条日志消息。

未处理异常的示例:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

Enable the debug mode 要获取任务创建位置的跟踪信息,请执行以下操作:

asyncio.run(main(), debug=True)

调试模式下的输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed