>>> from env_helper import info; info()
页面更新时间: 2024-03-29 16:20:49
运行环境:
    Linux发行版本: Debian GNU/Linux 12 (bookworm)
    操作系统内核: Linux-6.1.0-18-amd64-x86_64-with-glibc2.36
    Python版本: 3.11.2

1.17. 使用期物处理并发

抨击线程的往往是系统程序员,他们考虑的使用场景对一般的应用程序员来说,也许一生都不会遇到……应用程序员遇到的使用场景,99% 的情况下只需知道如何派生一堆独立的线程,然后用队列收集结果。
Michele Simionato, 深度思考 Python 的人

本章主要讨论 concurrent.futures 模块,并介绍“期物”(future)的概念。

我们在进行 IO 密集型并发编程(如批量下载)时,经常会考虑使用多线程场景来替代依序下载的方案,以提高下载效率。
在 IO 密集型任务中,如果代码写的正确,那么不管使用哪种并发策略(使用线程或 asyncio 包),吞吐量都要比依序执行的代码高很多。

1.17.1. 期物

期物(Future)表示“将要执行并返回结果的任务”,这个概念与 JavaScript 的 Promise 对象较为相似。

Python 3.4 起,标准库中有两个 Future 类:concurrent.futures.Futureasyncio.Future。这两个类的作用相同:Future 类的实例表示可能已经完成或尚未完成的延迟计算。
通常情况下自己不应该创建期物或改变期物的状态,而只能由并发框架实例化。
我们将某个任务交给并发框架后,这个任务将会由框架来进行调度,我们无法改变它的状态,也不能控制计算任务何时结束。
>>> # 简单的期物用法
>>> import time
>>> from concurrent import futures
>>>
>>>
>>> def fake_download(url):
>>>     time.sleep(1)      # 这里用的是多线程,所以可以直接考虑 sleep
>>>     return url
>>>
>>>
>>> def download_many(url_list):
>>>     with futures.ThreadPoolExecutor(max_workers=2) as executor:
>>>         to_do = []
>>>         for url in url_list:
>>>             future = executor.submit(fake_download, url)
>>>             to_do.append(future)
>>>             print(f"Scheduled for {url}: {future}")       # 因为 worker 数量有限,所以会有一个 future 处于 pending 状态
>>>         results = []
>>>         for future in futures.as_completed(to_do):
>>>             result = future.result()
>>>             print(f'{future} result: {result}')
>>>             results.append(result)
>>>         return results
>>>
>>> download_many(["https://www.baidu.com/", "https://www.google.com/",
>>>                "https://twitter.com/"])
Scheduled for https://www.baidu.com/: <Future at 0x10d4c04a8 state=running>
Scheduled for https://www.google.com/: <Future at 0x10d4a4f98 state=running>
Scheduled for https://twitter.com/: <Future at 0x10d4c0198 state=pending>
<Future at 0x10d4c04a8 state=finished returned str> result: https://www.baidu.com/
<Future at 0x10d4a4f98 state=finished returned str> result: https://www.google.com/
<Future at 0x10d4c0198 state=finished returned str> result: https://twitter.com/
['https://www.baidu.com/', 'https://www.google.com/', 'https://twitter.com/']
ThreadExecutor 使用多线程处理并发。在程序被 IO 阻塞时,Python 标准库会释放 GIL,以允许其它线程运行。
所以,GIL 的存在并不会对 IO 密集型多线程并发造成太大影响。
concurrent 包中提供了 ThreadPoolExecutorProcessPoolExecutor 类,分别对应多线程和多进程模型。
关于两种模型的使用及推荐并发数,我们有一个经验: * CPU 密集型任务,推荐使用多进程模型,以利用 CPU 的多个核心,max_workers 应设置为 CPU 核数; * IO 密集型任务,多核 CPU 不会提高性能,所以推荐使用多线程模型,可以省下多进程带来的资源开销,max_workers 可以尽可能设置多一些。
>>> # Executor.map
>>> # 并发运行多个可调用的对象时,可以使用 map 方法
>>>
>>> import time
>>> from concurrent import futures
>>>
>>>
>>> def fake_download(url):
>>>     time.sleep(1)      # 这里用的是多线程,所以可以直接考虑 sleep
>>>     print(f'[{time.strftime("%H:%M:%S")}] Done with {url}\n', end='')
>>>     return url
>>>
>>>
>>> def download_many(url_list):
>>>     with futures.ThreadPoolExecutor(max_workers=3) as executor:
>>>         results = executor.map(fake_download, url_list)
>>>         return results
>>>
>>> results = download_many(list(range(5)))
>>> print('Results:', list(results))
[15:36:08] Done with 0
[15:36:08] Done with 1
[15:36:08] Done with 2
[15:36:09] Done with 3
[15:36:09] Done with 4
Results: [0, 1, 2, 3, 4]

map 的使用可能更方便一点,但 futures.as_completed 则更灵活:支持不同的运算方法及参数,甚至支持来自不同 Executorfuture.

1.17.2. 总结

15 年的时候看过一篇文章叫《一行 Python 实现并行化》,里面讲述了如何利用 multiprocessing.Pool.map(或者 multiprocessing.dummy.Pool.map)快速实现多进程 / 多线程模型的并发任务处理。

`concurrent.furures <https://docs.python.org/3/library/concurrent.futures.html>`__ 模块于 Python 3.2 版本引入,它把线程、进程和队列是做服务的基础设施,无须手动进行管理即可轻松实现并发任务。同时,这个包引入了“期物”的概念,可以对并发任务更加规范地进行注册及管理。