concurrent.futures ——启动并行任务

3.2 新版功能.

源代码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


这个 concurrent.futures 模块为异步执行可调用文件提供高级接口。

异步执行可以用线程执行,使用 ThreadPoolExecutor 或分离进程,使用 ProcessPoolExecutor . 两者都实现了相同的接口,该接口由抽象的 Executor 类。

执行者对象

class concurrent.futures.Executor

提供异步执行调用的方法的抽象类。它不应该直接使用,而是通过它的具体子类。

submit(fn, /, *args, **kwargs)

调度可调用的, fn ,执行为 fn(*args **kwargs) 返回一个 Future 对象,表示可调用文件的执行。::

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

类似 map(func, *iterables) 除:

  • 这个 iterables 立即收集而不是懒散收集;

  • func 异步执行并对 func 可以同时制作。

返回的迭代器引发 concurrent.futures.TimeoutError 如果 __next__() 调用后结果不可用 timeout 从原始调用到 Executor.map() . timeout 可以是int或float。如果 timeout 未指定或 None ,等待时间没有限制。

如果A func 调用引发异常,然后在从迭代器中检索该异常的值时将引发该异常。

使用时 ProcessPoolExecutor ,这种方法切碎 iterables 作为单独的任务提交到池中的若干块。这些块的(近似)大小可以通过设置 chunksize 为正整数。对于非常长的iterables,使用大值 chunksize 与默认大小1相比,可以显著提高性能。用 ThreadPoolExecutorchunksize 没有效果。

在 3.5 版更改: 增加了 chunksize 参数。

shutdown(wait=True, *, cancel_futures=False)

向执行者发出信号,表示在当前挂起的期货执行完毕时,它应该释放正在使用的任何资源。调用 Executor.submit()Executor.map() 停机后制造将升高 RuntimeError .

如果 waitTrue 然后,在所有挂起的期货执行完毕并释放与执行者相关的资源之前,此方法不会返回。如果 waitFalse 然后,该方法将立即返回,当所有待执行的期货执行完毕时,与执行者关联的资源将被释放。不管 wait ,整个python程序将不会退出,直到所有挂起的期货执行完毕。

如果 cancel_futuresTrue ,此方法将取消执行器尚未开始运行的所有挂起的未来。任何已完成或正在运行的期货都不会被取消,无论 cancel_futures .

如果两者 cancel_futureswaitTrue ,执行器已开始运行的所有未来都将在返回此方法之前完成。剩余的期货被取消。

如果使用 with 语句,它将关闭 Executor (等待) Executor.shutdown() 是用 wait 设置为 True ):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版更改: 补充 cancel_futures .

ThreadPoolExecutor

ThreadPoolExecutor 是一个 Executor 使用线程池异步执行调用的子类。

当与 Future 等待其他人的结果 Future . 例如::

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Executor 最多使用池的子类 max_workers 异步执行调用的线程。

初始化器 是在每个工作线程开始时调用的可选可调用文件; 伊尼塔格斯 是传递给初始值设定项的参数的元组。应该 初始化器 引发异常,所有当前挂起的任务都将引发 BrokenThreadPool 以及向池提交更多任务的任何尝试。

在 3.5 版更改: 如果 max_workersNone 否则,它将默认为机器上处理器的数量乘以 5 假设 ThreadPoolExecutor 通常用于重叠I/O而不是CPU工作,并且工作人员的数量应高于 ProcessPoolExecutor .

3.6 新版功能: 这个 thread_name_prefix 已添加参数以允许用户控制 threading.Thread 池为便于调试而创建的工作线程的名称。

在 3.7 版更改: 增加了 初始化器伊尼塔格斯 参数。

在 3.8 版更改: 的默认值 max_workers 已更改为 min(32, os.cpu_count() + 4) . 此默认值为I/O绑定的任务保留至少5个工人。它最多使用32个CPU内核来执行CPU绑定的任务,从而释放gil。它避免在许多核心机器上隐式地使用非常大的资源。

threadpoolExecutor现在在启动之前重用空闲的工作线程 max_workers 工作线程也是。

线程池执行器示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

这个 ProcessPoolExecutor 类是 Executor 使用进程池异步执行调用的子类。 ProcessPoolExecutor 使用 multiprocessing 模块,允许它侧移 Global Interpreter Lock 但也意味着只能执行和返回可拾取的对象。

这个 __main__ 模块必须可由辅助子流程导入。这意味着 ProcessPoolExecutor 无法在交互式解释程序中工作。

调用 ExecutorFuture 方法从提交给 ProcessPoolExecutor 会导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

一个 Executor 最多使用一个池异步执行调用的子类 max_workers 进程。如果 max_workersNone 否则,它将默认为机器上的处理器数量。如果 max_workers 小于或等于 0 ,然后是一个 ValueError 都会被举起。在Windows上, max_workers 必须小于或等于 61 。如果不是,那么 ValueError 都会被举起。如果 max_workersNone ,则选择的缺省值最多为 61 ,即使有更多的处理器可用。 mp_context 可以是多处理上下文或无上下文。它将被用来启动工人。如果 mp_contextNone 或未给出,则使用默认的多处理上下文。

初始化式 是一个可选的可调用对象,在每个辅助进程开始时调用; 初始值 是传递给初始值设定项的参数元组。应该 初始化式 引发异常,则所有当前挂起的作业都将引发 BrokenProcessPool ,以及向池提交更多作业的任何尝试。

在 3.3 版更改: 当某个工作进程突然终止时, BrokenProcessPool 现在出现错误。以前,行为是不明确的,但对执行人或其期货的操作往往会冻结或僵局。

在 3.7 版更改: 这个 mp_context 添加了参数以允许用户控制由池创建的工作进程的start_方法。

增加了 初始化器伊尼塔格斯 参数。

ProcessPoolexecutor示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

未来对象

这个 Future 类封装可调用文件的异步执行。 Future 实例由创建 Executor.submit() .

class concurrent.futures.Future

封装可调用文件的异步执行。 Future 实例由创建 Executor.submit() 除了测试之外,不应该直接创建。

cancel()

尝试取消呼叫。如果调用当前正在执行或已完成运行,并且无法取消,则该方法将返回 False ,否则调用将被取消,方法将返回 True .

cancelled()

返回 True 如果调用成功取消。

running()

返回 True 如果调用当前正在执行且无法取消。

done()

返回 True 如果调用成功取消或完成运行。

result(timeout=None)

返回调用返回的值。如果调用尚未完成,则此方法将等待 timeout 秒。如果调用没有接通 timeout 秒,然后 concurrent.futures.TimeoutError 将被引发。 timeout 可以是int或float。如果 timeout 未指定或 None ,等待时间没有限制。

如果在完成之前取消了未来,那么 CancelledError 将被引发。

如果调用引发,此方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。如果调用尚未完成,则此方法将等待 timeout 秒。如果调用没有接通 timeout 秒,然后 concurrent.futures.TimeoutError 将被引发。 timeout 可以是int或float。如果 timeout 未指定或 None ,等待时间没有限制。

如果在完成之前取消了未来,那么 CancelledError 将被引发。

如果调用完成而没有引发, None 返回。

add_done_callback(fn)

附加可调用的 fn 展望未来。 fn 当将来被取消或结束运行时,将以future作为唯一参数调用。

添加的可调用文件是按照添加它们的顺序调用的,并且总是在属于添加它们的进程的线程中调用。如果可调用项引发 Exception 子类,它将被记录并忽略。如果可调用项引发 BaseException 子类,行为未定义。

如果未来已经完成或被取消, fn 将立即调用。

以下 Future 方法用于单元测试和 Executor 实施。

set_running_or_notify_cancel()

此方法只能由 Executor 在执行与 Future 通过单元测试。

如果方法返回 False 然后 Future 取消,即 Future.cancel() 已被调用并返回 True . 有线程在等待 Future 完成(即通过 as_completed()wait() )会被唤醒的。

如果方法返回 True 然后 Future 未被取消,并且已处于运行状态,即调用 Future.running() 将返回 True .

此方法只能调用一次,不能在之后调用 Future.set_result()Future.set_exception() 已被调用。

set_result(result)

设置与 Future结果 .

此方法只能由 Executor 实现和单元测试。

在 3.8 版更改: 此方法引发 concurrent.futures.InvalidStateError 如果 Future 已经完成。

set_exception(exception)

设置与 FutureException exception .

此方法只能由 Executor 实现和单元测试。

在 3.8 版更改: 此方法引发 concurrent.futures.InvalidStateError 如果 Future 已经完成。

模块功能

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待 Future 实例(可能由不同的 Executor 实例)由给出 fs 完成。返回集合的命名2元组。第一组,命名为 done ,包含等待完成前已完成(完成或取消的期货)。第二组,命名为 not_done ,包含未完成的期货(挂起或运行期货)。

timeout 可用于控制返回前等待的最大秒数。 timeout 可以是int或float。如果 timeout 未指定或 None ,等待时间没有限制。

return_when 指示此函数应何时返回。它必须是以下常量之一:

常数

描述

FIRST_COMPLETED

当任何将来完成或取消时,函数将返回。

FIRST_EXCEPTION

当将来通过引发异常而结束时,函数将返回。如果没有未来引发异常,那么它等于 ALL_COMPLETED .

ALL_COMPLETED

当所有预购完成或取消时,函数将返回。

concurrent.futures.as_completed(fs, timeout=None)

返回一个迭代器 Future 实例(可能由不同的 Executor 实例)由给出 fs 在完成(完成或取消的期货)时产生期货。任何期货 fs 重复的将返回一次。之前完成的任何期货 as_completed() 将首先生成调用。返回的迭代器引发 concurrent.futures.TimeoutError 如果 __next__() 调用后结果不可用 超时 从原始呼叫到 as_completed() . 超时 可以是int或float。如果 超时 未指定或 None ,等待时间没有限制。

参见

PEP 3148 --预购-异步执行计算

描述此特性以包含在Python标准库中的建议。

异常类

exception concurrent.futures.CancelledError

在取消未来时引发。

exception concurrent.futures.TimeoutError

在将来的操作超过给定超时时引发。

exception concurrent.futures.BrokenExecutor

来源于 RuntimeError ,此异常类在执行器因某种原因中断时引发,不能用于提交或执行新任务。

3.7 新版功能.

exception concurrent.futures.InvalidStateError

在对当前状态下不允许的将来执行操作时引发。

3.8 新版功能.

exception concurrent.futures.thread.BrokenThreadPool

来源于 BrokenExecutor ,当 ThreadPoolExecutor 初始化失败。

3.7 新版功能.

exception concurrent.futures.process.BrokenProcessPool

来源于 BrokenExecutor (以前) RuntimeError ,当 ProcessPoolExecutor 以非清洁的方式终止(例如,如果它是从外部杀死的)。

3.3 新版功能.