速度

../_images/33175625804_e225b90f3e_k_d.jpg

CPython是Python最常用的实现,对于CPU绑定的任务来说速度很慢。 PyPy 很快。

使用稍微修改过的版本 David Beazley's CPU绑定测试代码(为多个测试添加了循环),您可以看到cpython和pypy处理之间的区别。

# PyPy
$ ./pypy -V
Python 2.7.1 (7773f8fc4223, Nov 18 2011, 18:47:10)
[PyPy 1.7.0 with GCC 4.4.3]
$ ./pypy measure2.py
0.0683999061584
0.0483210086823
0.0388588905334
0.0440690517426
0.0695300102234
# CPython
$ ./python -V
Python 2.7.1
$ ./python measure2.py
1.06774401665
1.45412397385
1.51485204697
1.54693889618
1.60109114647

语境

The GIL

The GIL (全局解释器锁)是Python允许多个线程同时操作的方式。python的内存管理并不完全是线程安全的,因此需要gil来防止多个线程同时运行同一个python代码。

David Beazley有一个伟大的 guide 关于英国国债的运作方式。他还包括 new GIL 在python 3.2中。他的结果表明,在Python应用程序中最大限度地提高性能需要对GIL、它如何影响特定的应用程序、您拥有多少核心以及应用程序瓶颈在哪里有深刻的理解。

C扩展

The GIL

Special care 在编写C扩展时必须使用它,以确保在解释器中注册线程。

C扩展

Cython

Cython 实现Python语言的超集,用它可以编写Python的C和C++模块。Cython还允许您从编译的C库调用函数。使用cython可以利用python强大的变量和操作类型。

下面是一个使用Cython的强类型示例:

def primes(int kmax):
"""Calculation of prime numbers with additional
Cython keywords"""

    cdef int n, k, i
    cdef int p[1000]
    result = []
    if kmax > 1000:
        kmax = 1000
    k = 0
    n = 2
    while k < kmax:
        i = 0
        while i < k and n % p[i] != 0:
            i = i + 1
        if i == k:
            p[k] = n
            k = k + 1
            result.append(n)
        n = n + 1
    return result

与在纯Python中实现的下一个算法相比,查找素数的算法的这种实现还有一些额外的关键字:

def primes(kmax):
"""Calculation of prime numbers in standard Python syntax"""

    p = range(1000)
    result = []
    if kmax > 1000:
        kmax = 1000
    k = 0
    n = 2
    while k < kmax:
        i = 0
        while i < k and n % p[i] != 0:
            i = i + 1
        if i == k:
            p[k] = n
            k = k + 1
            result.append(n)
        n = n + 1
    return result

请注意,在Cython版本中,您声明要编译为C类型的整数和整数数组,同时还要创建一个Python列表:

def primes(int kmax):
    """Calculation of prime numbers with additional
    Cython keywords"""

    cdef int n, k, i
    cdef int p[1000]
    result = []
def primes(kmax):
    """Calculation of prime numbers in standard Python syntax"""

    p = range(1000)
    result = []

有什么区别?在上面的Cython版本中,您可以看到变量类型和整数数组的声明,其方式与标准C中类似。 cdef int n,k,i 在第3行。这个附加的类型声明(即整数)允许cython编译器从第二个版本生成更有效的C代码。标准Python代码保存在 *.py 文件,Cython代码保存在 *.pyx 文件夹。

速度有什么不同?让我们试试看!

import time
# Activate pyx compiler
import pyximport
pyximport.install()
import primesCy  # primes implemented with Cython
import primes  # primes implemented with Python

print("Cython:")
t1 = time.time()
print(primesCy.primes(500))
t2 = time.time()
print("Cython time: %s" % (t2 - t1))
print("")
print("Python")
t1 = time.time()
print(primes.primes(500))
t2 = time.time()
print("Python time: %s" % (t2 - t1))

这两行都需要注意:

import pyximport
pyximport.install()

这个 pyximport 模块允许您导入 *.pyx 文件(例如, primesCy.pyx )用赛通编译的版本 primes 功能。这个 pyximport.install() 命令允许python解释器直接启动cython编译器以生成C代码,该代码自动编译为 *.so C库。然后,Cython能够以您的Python代码轻松高效地为您导入这个库。与 time.time() 函数可以比较这两个不同调用之间的时间,找到500个素数。在标准笔记本电脑(双核AMD E-450 1.6 GHz)上,测量值为:

Cython time: 0.0054 seconds

Python time: 0.0566 seconds

这里是嵌入式系统的输出 ARM beaglebone 机器:

Cython time: 0.0196 seconds

Python time: 0.3302 seconds

Pyrex

Shedskin?

并发性

Concurrent.futures

concurrent.futures 模块是标准库中的一个模块,它提供“异步执行可调用文件的高级接口”。它抽象出许多关于使用多个线程或进程进行并发的更复杂的细节,并允许用户集中精力完成手头的任务。

concurrent.futures 模块公开了两个主要类 ThreadPoolExecutor 以及 ProcessPoolExecutor。ThreadPoolExecutor将创建一个工作线程池,用户可以将作业提交到该池中。当下一个工作线程可用时,这些作业将在另一个线程中执行。

ProcessPoolExecutor以相同的方式工作,除了为其工作人员使用多个线程之外,它将使用多个进程。这使得可以将GIL放在一边;但是,由于事物传递给工作进程的方式,只能执行和返回可拾取的对象。

由于GIL的工作方式,一个好的经验法则是在执行的任务涉及大量阻塞(即通过网络发出请求)时使用threadpoolExecutor,在任务计算开销较大时使用processpoolExecutor Executor。

使用两个执行器并行执行事物有两种主要方式。一种方法是 map(func, iterables) 方法。它的工作原理和内置的几乎一模一样。 map() 函数,但它将并行执行所有操作。

from concurrent.futures import ThreadPoolExecutor
import requests

def get_webpage(url):
    page = requests.get(url)
    return page

pool = ThreadPoolExecutor(max_workers=5)

my_urls = ['http://google.com/']*10  # Create a list of urls

for page in pool.map(get_webpage, my_urls):
    # Do something with the result
    print(page.text)

为了获得更大的控制力, submit(func, *args, **kwargs) method will schedule a callable to be executed ( as func(*args, **kwargs)) and returns a Future 表示可调用的执行的对象。

Future对象提供了各种方法,可用于检查计划调用的进度。其中包括:

取消()

尝试取消呼叫。

已取消()

如果成功取消呼叫,则返回true。

正在运行()

如果调用当前正在执行且无法取消,则返回true。

done()

如果呼叫成功取消或完成运行,则返回true。

结果()

返回调用返回的值。请注意,此调用将被阻止,直到默认情况下计划的可调用返回。

异常()

返回调用引发的异常。如果没有引发异常,则返回“无”。请注意,这将阻止就像 result()

添加完成回调(fn)

附加将要执行的回调函数(如 fn(future) )当计划的可调用返回时。

from concurrent.futures import ProcessPoolExecutor, as_completed

def is_prime(n):
    if n % 2 == 0:
        return n, False

    sqrt_n = int(n**0.5)
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return n, False
    return n, True

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

futures = []
with ProcessPoolExecutor(max_workers=4) as pool:
    # Schedule the ProcessPoolExecutor to check if a number is prime
    # and add the returned Future to our list of futures
    for p in PRIMES:
        fut = pool.submit(is_prime, p)
        futures.append(fut)

# As the jobs are completed, print out the results
for number, result in as_completed(futures):
    if result:
        print("{} is prime".format(number))
    else:
        print("{} is not prime".format(number))

concurrent.futures 模块包含两个用于处理Futures的助手函数。这个 as_completed(futures) 函数返回对期货列表的迭代器,在期货完成时生成期货。

wait(futures) 功能将简单地阻止,直到提供的期货列表中的所有期货都已完成。

有关更多信息,请参阅 concurrent.futures 模块,查阅官方文件。

线程

标准库附带一个 threading 允许用户手动处理多个线程的模块。

在另一个线程中运行函数非常简单,只需将可调用文件及其参数传递给 Thread 的构造函数,然后调用 start()

from threading import Thread
import requests

def get_webpage(url):
    page = requests.get(url)
    return page

some_thread = Thread(get_webpage, 'http://google.com/')
some_thread.start()

要等待线程终止,请调用 join()

some_thread.join()

打电话后 join() ,最好检查线程是否仍处于活动状态(因为join调用超时):

if some_thread.is_alive():
    print("join() must have timed out.")
else:
    print("Our thread has terminated.")

由于多个线程可以访问同一内存段,有时可能会出现两个或多个线程试图同时写入同一资源或输出依赖于某些事件的顺序或时间的情况。这叫A data race 或比赛条件。当发生这种情况时,输出将出现混乱,或者您可能会遇到难以调试的问题。一个很好的例子是 Stack Overflow post .

避免这种情况的方法是使用 Lock 每个线程在写入共享资源之前需要获取的。可以通过ContextManager协议获取和释放锁 (with 声明),或使用 acquire()release() 直接。这里有一个(相当人为的)例子:

from threading import Lock, Thread

file_lock = Lock()

def log(msg):
    with file_lock:
        open('website_changes.log', 'w') as f:
            f.write(changes)

def monitor_website(some_website):
    """
    Monitor a website and then if there are any changes,
    log them to disk.
    """
    while True:
        changes = check_for_changes(some_website)
        if changes:
            log(changes)

websites = ['http://google.com/', ... ]
for website in websites:
    t = Thread(monitor_website, website)
    t.start()

在这里,我们有一组线程检查站点列表中的更改,每当有任何更改时,它们都试图通过调用 log(changes) . 什么时候? log() 调用,它将等待获取 with file_lock: .这样可以确保在任何时候只有一个线程正在写入文件。

生成过程

多重处理