concurrent.futures
——启动并行任务¶
3.2 新版功能.
源代码: Lib/concurrent/futures/thread.py 和 Lib/concurrent/futures/process.py
这个 concurrent.futures
模块为异步执行可调用文件提供高级接口。
异步执行可以用线程执行,使用 ThreadPoolExecutor
或分离进程,使用 ProcessPoolExecutor
. 两者都实现了相同的接口,该接口由抽象的 Executor
类。
执行者对象¶
- class concurrent.futures.Executor¶
提供异步执行调用的方法的抽象类。它不应该直接使用,而是通过它的具体子类。
- submit(fn, /, *args, **kwargs)¶
调度可调用的, fn ,执行为
fn(*args **kwargs)
返回一个Future
对象,表示可调用文件的执行。::with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(func, *iterables, timeout=None, chunksize=1)¶
类似
map(func, *iterables)
除:这个 iterables 立即收集而不是懒散收集;
func 异步执行并对 func 可以同时制作。
返回的迭代器引发
concurrent.futures.TimeoutError
如果__next__()
调用后结果不可用 timeout 从原始调用到Executor.map()
. timeout 可以是int或float。如果 timeout 未指定或None
,等待时间没有限制。如果A func 调用引发异常,然后在从迭代器中检索该异常的值时将引发该异常。
使用时
ProcessPoolExecutor
,这种方法切碎 iterables 作为单独的任务提交到池中的若干块。这些块的(近似)大小可以通过设置 chunksize 为正整数。对于非常长的iterables,使用大值 chunksize 与默认大小1相比,可以显著提高性能。用ThreadPoolExecutor
, chunksize 没有效果。在 3.5 版更改: 增加了 chunksize 参数。
- shutdown(wait=True, *, cancel_futures=False)¶
向执行者发出信号,表示在当前挂起的期货执行完毕时,它应该释放正在使用的任何资源。调用
Executor.submit()
和Executor.map()
停机后制造将升高RuntimeError
.如果 wait 是
True
然后,在所有挂起的期货执行完毕并释放与执行者相关的资源之前,此方法不会返回。如果 wait 是False
然后,该方法将立即返回,当所有待执行的期货执行完毕时,与执行者关联的资源将被释放。不管 wait ,整个python程序将不会退出,直到所有挂起的期货执行完毕。如果 cancel_futures 是
True
,此方法将取消执行器尚未开始运行的所有挂起的未来。任何已完成或正在运行的期货都不会被取消,无论 cancel_futures .如果两者 cancel_futures 和 wait 是
True
,执行器已开始运行的所有未来都将在返回此方法之前完成。剩余的期货被取消。如果使用
with
语句,它将关闭Executor
(等待)Executor.shutdown()
是用 wait 设置为True
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
在 3.9 版更改: 补充 cancel_futures .
ThreadPoolExecutor¶
ThreadPoolExecutor
是一个 Executor
使用线程池异步执行调用的子类。
当与 Future
等待其他人的结果 Future
. 例如::
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
安
Executor
最多使用池的子类 max_workers 异步执行调用的线程。初始化器 是在每个工作线程开始时调用的可选可调用文件; 伊尼塔格斯 是传递给初始值设定项的参数的元组。应该 初始化器 引发异常,所有当前挂起的任务都将引发
BrokenThreadPool
以及向池提交更多任务的任何尝试。在 3.5 版更改: 如果 max_workers 是
None
否则,它将默认为机器上处理器的数量乘以5
假设ThreadPoolExecutor
通常用于重叠I/O而不是CPU工作,并且工作人员的数量应高于ProcessPoolExecutor
.3.6 新版功能: 这个 thread_name_prefix 已添加参数以允许用户控制
threading.Thread
池为便于调试而创建的工作线程的名称。在 3.7 版更改: 增加了 初始化器 和 伊尼塔格斯 参数。
在 3.8 版更改: 的默认值 max_workers 已更改为
min(32, os.cpu_count() + 4)
. 此默认值为I/O绑定的任务保留至少5个工人。它最多使用32个CPU内核来执行CPU绑定的任务,从而释放gil。它避免在许多核心机器上隐式地使用非常大的资源。threadpoolExecutor现在在启动之前重用空闲的工作线程 max_workers 工作线程也是。
线程池执行器示例¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
这个 ProcessPoolExecutor
类是 Executor
使用进程池异步执行调用的子类。 ProcessPoolExecutor
使用 multiprocessing
模块,允许它侧移 Global Interpreter Lock 但也意味着只能执行和返回可拾取的对象。
这个 __main__
模块必须可由辅助子流程导入。这意味着 ProcessPoolExecutor
无法在交互式解释程序中工作。
调用 Executor
或 Future
方法从提交给 ProcessPoolExecutor
会导致死锁。
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())¶
一个
Executor
最多使用一个池异步执行调用的子类 max_workers 进程。如果 max_workers 是None
否则,它将默认为机器上的处理器数量。如果 max_workers 小于或等于0
,然后是一个ValueError
都会被举起。在Windows上, max_workers 必须小于或等于61
。如果不是,那么ValueError
都会被举起。如果 max_workers 是None
,则选择的缺省值最多为61
,即使有更多的处理器可用。 mp_context 可以是多处理上下文或无上下文。它将被用来启动工人。如果 mp_context 是None
或未给出,则使用默认的多处理上下文。初始化式 是一个可选的可调用对象,在每个辅助进程开始时调用; 初始值 是传递给初始值设定项的参数元组。应该 初始化式 引发异常,则所有当前挂起的作业都将引发
BrokenProcessPool
,以及向池提交更多作业的任何尝试。在 3.3 版更改: 当某个工作进程突然终止时,
BrokenProcessPool
现在出现错误。以前,行为是不明确的,但对执行人或其期货的操作往往会冻结或僵局。在 3.7 版更改: 这个 mp_context 添加了参数以允许用户控制由池创建的工作进程的start_方法。
增加了 初始化器 和 伊尼塔格斯 参数。
ProcessPoolexecutor示例¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
未来对象¶
这个 Future
类封装可调用文件的异步执行。 Future
实例由创建 Executor.submit()
.
- class concurrent.futures.Future¶
封装可调用文件的异步执行。
Future
实例由创建Executor.submit()
除了测试之外,不应该直接创建。- cancel()¶
尝试取消呼叫。如果调用当前正在执行或已完成运行,并且无法取消,则该方法将返回
False
,否则调用将被取消,方法将返回True
.
- cancelled()¶
返回
True
如果调用成功取消。
- running()¶
返回
True
如果调用当前正在执行且无法取消。
- done()¶
返回
True
如果调用成功取消或完成运行。
- result(timeout=None)¶
返回调用返回的值。如果调用尚未完成,则此方法将等待 timeout 秒。如果调用没有接通 timeout 秒,然后
concurrent.futures.TimeoutError
将被引发。 timeout 可以是int或float。如果 timeout 未指定或None
,等待时间没有限制。如果在完成之前取消了未来,那么
CancelledError
将被引发。如果调用引发,此方法将引发相同的异常。
- exception(timeout=None)¶
返回调用引发的异常。如果调用尚未完成,则此方法将等待 timeout 秒。如果调用没有接通 timeout 秒,然后
concurrent.futures.TimeoutError
将被引发。 timeout 可以是int或float。如果 timeout 未指定或None
,等待时间没有限制。如果在完成之前取消了未来,那么
CancelledError
将被引发。如果调用完成而没有引发,
None
返回。
- add_done_callback(fn)¶
附加可调用的 fn 展望未来。 fn 当将来被取消或结束运行时,将以future作为唯一参数调用。
添加的可调用文件是按照添加它们的顺序调用的,并且总是在属于添加它们的进程的线程中调用。如果可调用项引发
Exception
子类,它将被记录并忽略。如果可调用项引发BaseException
子类,行为未定义。如果未来已经完成或被取消, fn 将立即调用。
以下
Future
方法用于单元测试和Executor
实施。- set_running_or_notify_cancel()¶
此方法只能由
Executor
在执行与Future
通过单元测试。如果方法返回
False
然后Future
取消,即Future.cancel()
已被调用并返回 True . 有线程在等待Future
完成(即通过as_completed()
或wait()
)会被唤醒的。如果方法返回
True
然后Future
未被取消,并且已处于运行状态,即调用Future.running()
将返回 True .此方法只能调用一次,不能在之后调用
Future.set_result()
或Future.set_exception()
已被调用。
- set_result(result)¶
设置与
Future
到 结果 .此方法只能由
Executor
实现和单元测试。在 3.8 版更改: 此方法引发
concurrent.futures.InvalidStateError
如果Future
已经完成。
模块功能¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
等待
Future
实例(可能由不同的Executor
实例)由给出 fs 完成。返回集合的命名2元组。第一组,命名为done
,包含等待完成前已完成(完成或取消的期货)。第二组,命名为not_done
,包含未完成的期货(挂起或运行期货)。timeout 可用于控制返回前等待的最大秒数。 timeout 可以是int或float。如果 timeout 未指定或
None
,等待时间没有限制。return_when 指示此函数应何时返回。它必须是以下常量之一:
常数
描述
FIRST_COMPLETED
当任何将来完成或取消时,函数将返回。
FIRST_EXCEPTION
当将来通过引发异常而结束时,函数将返回。如果没有未来引发异常,那么它等于
ALL_COMPLETED
.ALL_COMPLETED
当所有预购完成或取消时,函数将返回。
- concurrent.futures.as_completed(fs, timeout=None)¶
返回一个迭代器
Future
实例(可能由不同的Executor
实例)由给出 fs 在完成(完成或取消的期货)时产生期货。任何期货 fs 重复的将返回一次。之前完成的任何期货as_completed()
将首先生成调用。返回的迭代器引发concurrent.futures.TimeoutError
如果__next__()
调用后结果不可用 超时 从原始呼叫到as_completed()
. 超时 可以是int或float。如果 超时 未指定或None
,等待时间没有限制。
参见
- PEP 3148 --预购-异步执行计算
描述此特性以包含在Python标准库中的建议。
异常类¶
- exception concurrent.futures.CancelledError¶
在取消未来时引发。
- exception concurrent.futures.TimeoutError¶
在将来的操作超过给定超时时引发。
- exception concurrent.futures.BrokenExecutor¶
来源于
RuntimeError
,此异常类在执行器因某种原因中断时引发,不能用于提交或执行新任务。3.7 新版功能.
- exception concurrent.futures.InvalidStateError¶
在对当前状态下不允许的将来执行操作时引发。
3.8 新版功能.
- exception concurrent.futures.thread.BrokenThreadPool¶
来源于
BrokenExecutor
,当ThreadPoolExecutor
初始化失败。3.7 新版功能.
- exception concurrent.futures.process.BrokenProcessPool¶
来源于
BrokenExecutor
(以前)RuntimeError
,当ProcessPoolExecutor
以非清洁的方式终止(例如,如果它是从外部杀死的)。3.3 新版功能.