multiprocessing ——基于过程的并行性

源代码: Lib/multiprocessing/


介绍

multiprocessing 是一个支持使用类似于 threading 模块。这个 multiprocessing 包提供本地和远程并发,有效地避免了 Global Interpreter Lock 通过使用子进程而不是线程。因此, multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。

这个 multiprocessing 模块还引入了API,这些API在 threading 模块。一个主要的例子是 Pool 对象,它提供了一种方便的方法,可以跨多个输入值并行执行一个函数,并跨进程分布输入数据(数据并行)。下面的示例演示了在模块中定义此类函数以便子进程能够成功导入该模块的常见做法。此数据并行的基本示例使用 Pool ,:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

将打印到标准输出:

[1, 4, 9]

这个 Process

multiprocessing ,通过创建 Process 对象,然后调用其 start() 方法。 Process 遵循的API threading.Thread . 多进程程序的一个简单示例是:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

要显示涉及的各个进程ID,下面是一个扩展的示例:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

为了解释为什么 if __name__ == '__main__' 部分是必需的,请参见 程序设计指南 .

上下文和启动方法

取决于平台, multiprocessing 支持三种启动流程的方法。这些 启动方法

产卵

父进程启动一个新的Python解释器进程。子进程将仅继承运行Process对象的 run() 方法。特别是,不会继承来自父进程的不必要的文件描述符和句柄。与使用以下方法启动进程相比,使用此方法启动进程相当慢 fork分叉服务器

在Unix和Windows上可用。Windows和MacOS上的默认设置。

fork

父进程使用 os.fork() 以派生python解释器。子进程开始时,实际上与父进程相同。父进程的所有资源都由子进程继承。注意,安全地复刻多线程进程是有问题的。

仅在Unix上可用。Unix上的默认值。

福克斯服务器

当程序启动并选择 福克斯服务器 启动方法,启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它复刻一个新进程。fork服务器进程是单线程的,因此使用它是安全的 os.fork() . 不会继承不必要的资源。

在支持通过Unix管道传递文件描述符的Unix平台上可用。

在 3.8 版更改: 在MacOS上, 产卵 Start方法现在是默认方法。这个 fork Start方法应该被认为是不安全的,因为它可能导致子进程崩溃。见 bpo-33725 .

在 3.4 版更改: 产卵 在所有Unix平台上添加,以及 福克斯服务器 为某些Unix平台添加。子进程不再继承Windows上所有父进程可继承的句柄。

在Unix上使用 产卵福克斯服务器 Start方法也将启动 资源跟踪器 跟踪未链接的命名系统资源(如命名信号量或 SharedMemory 对象)由程序进程创建。当所有进程退出时,资源跟踪器将取消链接任何剩余的跟踪对象。通常应该没有,但如果一个进程被一个信号杀死,可能会有一些“泄漏”的资源。(泄漏的信号灯和共享内存段在下次重新启动之前都不会自动解除链接。这对于两个对象都是有问题的,因为系统只允许有限数量的命名信号量,并且共享内存段在主内存中占用了一些空间。)

要选择开始方法,请使用 set_start_method()if __name__ == '__main__' 主模块的子句。例如::

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() 不应在程序中多次使用。

或者,您可以使用 get_context() 获取上下文对象。上下文对象与多处理模块具有相同的API,并且允许在同一程序中使用多个启动方法。::

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

请注意,与一个上下文相关的对象可能与其他上下文的进程不兼容。尤其是,使用 fork 无法将上下文传递给使用 产卵福克斯服务器 启动方法。

希望使用特定start方法的库可能应该使用 get_context() 以避免干扰库用户的选择。

警告

这个 'spawn''forkserver' Start方法当前不能与“冻结的”可执行文件(即由类似包生成的二进制文件)一起使用 PyInstallercx_Freeze 在UNIX上。这个 'fork' Start方法确实有效。

在进程之间交换对象

multiprocessing 支持两种进程间的通信通道:

Queues

这个 Queue 类是的近复制 queue.Queue . 例如::

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

队列是线程和进程安全的。

Pipes

这个 Pipe() 函数返回由管道连接的一对连接对象,默认情况下,管道是双向的。例如::

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send()recv() 方法(等等)。请注意,如果两个进程(或线程)尝试读取或写入 same 同时结束管道。当然,同时使用不同管端的过程不会有损坏的风险。

进程之间的同步

multiprocessing 包含来自的所有同步原语的等价物 threading . 例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用来自不同进程的锁输出,则很容易混淆所有内容。

进程间共享状态

如上所述,在进行并发编程时,最好尽量避免使用共享状态。当使用多个进程时尤其如此。

但是,如果您确实需要使用一些共享数据,那么 multiprocessing 提供了几种方法。

共享内存

数据可以存储在共享内存映射中,使用 ValueArray . 例如,以下代码:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

将打印:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

这个 'd''i' 创建时使用的参数 numarr 是否使用了 array 模块: 'd' 指示双精度浮点和 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 支持创建从共享内存分配的任意CTypes对象的模块。

服务器进程

由返回的管理器对象 Manager() 控制保存python对象的服务器进程,并允许其他进程使用代理来操作它们。

经理回来了 Manager() 将支持类型 listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray . 例如:

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

将打印:

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,一个管理器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存慢。

使用一批工人

这个 Pool 类表示工作进程池。它有一些方法可以以几种不同的方式将任务卸载到工作进程中。

例如::

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

请注意,池的方法只能由创建它的进程使用。

注解

此包中的功能要求 __main__ 模块可由子项导入。这包括在 程序设计指南 但是这里值得指出。这意味着一些例子,例如 multiprocessing.pool.Pool 示例在交互式解释器中不起作用。例如::

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...   p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(如果您尝试这样做,它实际上将以半随机方式交错输出三个完整的回溯,然后您可能必须以某种方式停止父进程。)

参考文献

这个 multiprocessing 包主要复制 threading 模块。

Process 和例外

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

流程对象表示在单独流程中运行的活动。这个 Process 类的所有方法都具有等价的 threading.Thread .

应始终使用关键字参数调用构造函数。 group 应该永远是 None ;它的存在只是为了与 threading.Thread . 目标 可调用对象是否由 run() 方法。它默认为 None ,表示没有调用任何内容。 name 是进程名(请参见 name 了解更多详细信息)。 args 是目标调用的参数元组。 关键字参数 是用于目标调用的关键字参数字典。如果提供,则仅关键字 守护进程 参数设置进程 daemon 旗到 TrueFalse . 如果 None (默认),此标志将从创建过程继承。

默认情况下,不会将任何参数传递给 目标 .

如果子类重写构造函数,它必须确保它调用基类构造函数 (Process.__init__() )在对过程进行任何其他操作之前。

在 3.3 版更改: 增加了 守护进程 参数。

run()

表示进程活动的方法。

可以在子类中重写此方法。标准 run() 方法调用作为目标参数(如果有)传递给对象构造函数的可调用对象,其中顺序参数和关键字参数取自 args关键字参数 分别是参数。

start()

启动进程的活动。

每个进程对象最多必须调用一次。它为对象的 run() 要在单独的进程中调用的方法。

join([timeout])

如果可选参数 timeoutNone (默认值),该方法将一直阻塞到 join() 方法被调用终止。如果 timeout 是正数,最多阻塞 timeout 秒。注意,该方法返回 None 如果其进程终止或方法超时。检查进程的 exitcode 以确定是否终止。

一个进程可以连接多次。

进程无法联接自身,因为这将导致死锁。在进程启动之前尝试加入它是一个错误。

name

进程的名称。名称是一个仅用于标识的字符串。它没有语义。可以为多个进程赋予相同的名称。

初始名称由构造函数设置。如果没有为构造函数提供显式名称,则为表单'process-n'的名称1 N: 2 ……: k '是构造的,其中每个nk 是其父级的第n个子级。

is_alive()

返回进程是否处于活动状态。

大致上,流程对象从 start() 方法返回,直到子进程终止。

daemon

进程的守护进程标志,一个布尔值。必须在之前设置 start() 被称为。

初始值从创建过程继承。

当一个进程退出时,它会尝试终止它的所有后台子进程。

请注意,不允许后台进程创建子进程。否则,如果在父进程退出时终止守护进程,则守护进程将使其子进程保持孤立状态。此外,这些是 not Unix守护进程或服务,它们是正常的进程,如果非守护进程退出,它们将被终止(而不是加入)。

除了 threading.Thread 应用程序编程接口, Process 对象还支持以下属性和方法:

pid

返回进程ID。在生成进程之前,将 None .

exitcode

子项的退出代码。这将是 None 如果进程尚未终止。负值 -N 指示子级已被信号终止 N .

authkey

进程的身份验证密钥(字节字符串)。

什么时候? multiprocessing 已初始化,主进程将使用 os.urandom() .

当A Process 对象已创建,它将继承其父进程的身份验证密钥,尽管可以通过设置更改此密钥 authkey 到另一个字节字符串。

身份验证密钥 .

sentinel

系统对象的一种数字句柄,当进程结束时,它将变为“就绪”。

如果要同时等待多个事件,可以使用此值 multiprocessing.connection.wait() . 否则调用 join() 更简单。

在Windows上,这是一个操作系统句柄,可用于 WaitForSingleObjectWaitForMultipleObjects API调用系列。在Unix上,这是一个文件描述符,可用于 select 模块。

3.3 新版功能.

terminate()

终止进程。在Unix上,可以使用 SIGTERM 信号;在窗口 TerminateProcess() 使用。请注意,不会执行退出处理程序和finally子句等。

请注意,进程的后代进程将 not 被终止——他们只会变成孤儿。

警告

如果在关联进程使用管道或队列时使用此方法,则该管道或队列可能会损坏,并且可能无法由其他进程使用。同样,如果进程获得了锁或信号量等,那么终止它可能会导致其他进程死锁。

kill()

等同于 terminate() 但是使用 SIGKILL UNIX上的信号。

3.7 新版功能.

close()

关闭 Process 对象,释放与其关联的所有资源。 ValueError 如果基础进程仍在运行,则引发。一次 close() 成功返回的大多数其他方法和属性 Process 对象将升高 ValueError .

3.7 新版功能.

请注意 start()join()is_alive()terminate()exitcode 方法只能由创建流程对象的流程调用。

一些方法的示例用法 Process

 >>> import multiprocessing, time, signal
 >>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
 >>> print(p, p.is_alive())
 <Process ... initial> False
 >>> p.start()
 >>> print(p, p.is_alive())
 <Process ... started> True
 >>> p.terminate()
 >>> time.sleep(0.1)
 >>> print(p, p.is_alive())
 <Process ... stopped exitcode=-SIGTERM> False
 >>> p.exitcode == -signal.SIGTERM
 True
exception multiprocessing.ProcessError

所有人的基本阶级 multiprocessing 例外情况。

exception multiprocessing.BufferTooShort

引发的异常 Connection.recv_bytes_into() 当提供的缓冲区对象太小而无法读取消息时。

如果 e 是的实例 BufferTooShort 然后 e.args[0] 将以字节字符串形式给出消息。

exception multiprocessing.AuthenticationError

出现身份验证错误时引发。

exception multiprocessing.TimeoutError

由超时超时的方法引发。

管道和队列

当使用多个进程时,通常使用消息传递在进程之间进行通信,并避免使用任何同步原语(如锁)。

传递信息时,可以使用 Pipe() (用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

这个 QueueSimpleQueueJoinableQueue 类型包括多生产者、多消费者 FIFO 在上建模的队列 queue.Queue 标准库中的类。他们在这方面不同 Queue 缺乏 task_done()join() python 2.5中引入的方法 queue.Queue 类。

如果你使用 JoinableQueue 那么你 must 调用 JoinableQueue.task_done() 对于从队列中删除的每个任务,或者用于计算未完成任务数的信号量最终可能溢出,从而引发异常。

注意,还可以使用管理器对象创建共享队列——请参见 经理 .

注解

multiprocessing 使用常规 queue.Emptyqueue.Full 发出超时信号的异常。它们在中不可用 multiprocessing 名称空间,因此需要从 queue .

注解

当一个对象被放入队列中时,该对象被pickle,后台线程随后将pickled数据刷新到底层管道中。这会产生一些结果,这有点令人吃惊,但不应造成任何实际困难——如果它们确实让您感到困扰,那么您可以使用用 manager .

  1. 将对象放入空队列后,队列的 empty() 方法返回 Falseget_nowait() 可以不升高而返回 queue.Empty .

  2. 如果多个进程将对象排队,则可能会导致对象在另一端接收不正常。但是,由同一进程排队的对象将始终按照彼此的预期顺序排列。

警告

如果使用 Process.terminate()os.kill() 当它试图使用 Queue ,则队列中的数据可能会损坏。这可能会导致任何其他进程在稍后尝试使用队列时获得异常。

警告

如上所述,如果子进程已将项目放入队列中(并且未使用 JoinableQueue.cancel_join_thread ,则在将所有缓冲项刷新到管道之前,该进程不会终止。

这意味着,如果尝试加入该进程,可能会出现死锁,除非您确定已放入队列的所有项都已被占用。同样,如果子进程是非守护进程,那么当父进程尝试加入其所有非守护进程子进程时,它可能会挂起退出。

请注意,使用管理器创建的队列不存在此问题。见 程序设计指南 .

有关使用队列进行进程间通信的示例,请参阅 实例 .

multiprocessing.Pipe([duplex])

返回一对 (conn1, conn2) 属于 Connection 表示管道末端的对象。

如果 双工True (默认)则管道是双向的。如果 双工False 那么管道是单向的: conn1 只能用于接收消息和 conn2 只能用于发送消息。

class multiprocessing.Queue([maxsize])

返回使用管道和一些锁/信号灯实现的进程共享队列。当一个进程第一次将一个项目放入队列时,就会启动一个feeder线程,该线程将对象从缓冲区传输到管道中。

通常 queue.Emptyqueue.Full 标准库的例外情况 queue 模块被引发到信号超时。

Queue 实现的所有方法 queue.Queue 除了 task_done()join() .

qsize()

返回队列的大致大小。由于多线程/多处理语义,这个数字不可靠。

注意,这可能会提高 NotImplementedError 在像Mac OS X这样的Unix平台上, sem_getvalue() 未实现。

empty()

返回 True 如果队列为空, False 否则。由于多线程/多处理语义,这是不可靠的。

full()

返回 True 如果队列已满, False 否则。由于多线程/多处理语义,这是不可靠的。

put(obj[, block[, timeout]])

将obj放入队列。如果可选参数 True (违约)和 timeoutNone (默认),必要时阻止,直到可用插槽。如果 timeout 是正数,最多阻塞 timeout 秒,然后提高 queue.Full 如果在此时间内没有可用插槽,则为例外。否则( False ,如果立即有可用的插槽,则将项目放入队列,否则将引发 queue.Full 例外(例外) timeout 在这种情况下被忽略)。

在 3.8 版更改: 如果队列关闭, ValueError 被引发而不是 AssertionError .

put_nowait(obj)

相当于 put(obj, False) .

get([block[, timeout]])

从队列中移除并返回项目。如果可选参数 True (违约)和 timeoutNone (默认),必要时阻止,直到项目可用。如果 timeout 是正数,最多阻塞 timeout 秒,然后提高 queue.Empty 如果在该时间内没有可用的项目,则为例外。否则(块为 False ,如果一个项目立即可用,则返回该项目,否则将引发 queue.Empty 例外(例外) timeout 在这种情况下被忽略)。

在 3.8 版更改: 如果队列关闭, ValueError 被引发而不是 OSError .

get_nowait()

相当于 get(False) .

multiprocessing.Queue 有一些其他方法在中找不到 queue.Queue . 大多数代码通常不需要这些方法:

close()

指示当前进程不会再将数据放入此队列。后台线程将所有缓冲数据刷新到管道后将退出。当队列被垃圾收集时,将自动调用此函数。

join_thread()

加入后台线程。只能在以下时间后使用 close() 已被调用。它将一直阻塞,直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道中。

默认情况下,如果进程不是队列的创建者,那么在退出时,它将尝试加入队列的后台线程。进程可以调用 cancel_join_thread() 使 join_thread() 什么也不做。

cancel_join_thread()

预防 join_thread() 阻止。特别是,这可以防止后台线程在进程退出时自动加入——请参见 join_thread() .

此方法的更好名称可能是 allow_exit_without_flush() . 它很可能会导致排队的数据丢失,您几乎可以肯定不需要使用它。只有当您需要当前进程立即退出而不必等待将排队的数据刷新到底层管道中,并且您不关心丢失的数据时,才会出现这种情况。

注解

此类的功能要求在主机操作系统上实现一个正常工作的共享信号量。如果没有这个类,这个类中的功能将被禁用,并尝试实例化一个 Queue 将导致 ImportError . 见 bpo-3770 更多信息。下面列出的任何专用队列类型都是如此。

class multiprocessing.SimpleQueue

它是一个简化的 Queue 类型,非常接近锁定 Pipe .

close()

关闭队列:释放内部资源。

队列关闭后不能再使用。例如, get()put()empty() 不能再调用方法。

3.9 新版功能.

empty()

返回 True 如果队列为空, False 否则。

get()

从队列中移除并返回项目。

put(item)

item 进入队列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue ,A Queue 子类,是另外具有 task_done()join() 方法。

task_done()

指示以前排队的任务已完成。由队列使用者使用。对于每一个 get() 用于获取任务,随后调用 task_done() 告诉队列任务的处理已完成。

如果A join() 当前正在阻止,它将在处理完所有项目后恢复(意味着 task_done() 每一件物品都接到了调用 put() 进入队列)。

提出一个 ValueError 如果调用次数超过了队列中放置的项目数。

join()

阻止,直到队列中的所有项目都被获取和处理。

每当将项目添加到队列时,未完成任务的计数就会增加。每当消费者调用时,计数就会下降。 task_done() 以指示已检索到该项,并且对其进行的所有工作都已完成。当未完成任务的计数降至零时, join() 解除阻塞。

其他

multiprocessing.active_children()

返回当前进程的所有活动子进程的列表。

调用它会产生“连接”任何已经完成的进程的副作用。

multiprocessing.cpu_count()

返回系统中的CPU数。

这个数字不等于当前进程可以使用的CPU数量。可用CPU的数量可以通过 len(os.sched_getaffinity(0))

可以提高 NotImplementedError .

multiprocessing.current_process()

返回 Process 与当前进程对应的对象。

类似物 threading.current_thread() .

multiprocessing.parent_process()

返回 Process 对象对应的父进程 current_process() . 对于主要过程, parent_processNone .

3.8 新版功能.

multiprocessing.freeze_support()

当程序使用 multiprocessing 已冻结以生成Windows可执行文件。(已用 PY2Exe公司PyInstallercx_Freeze

我们需要在 if __name__ == '__main__' 主模块的线路。例如::

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果 freeze_support() 行被省略,然后尝试运行冻结的可执行文件将引发 RuntimeError .

调用 freeze_support() 在除Windows以外的任何操作系统上调用时不起作用。此外,如果模块在Windows上由python解释器正常运行(程序尚未冻结),则 freeze_support() 没有效果。

multiprocessing.get_all_start_methods()

返回受支持的Start方法的列表,其中第一个方法是默认方法。可能的启动方法是 'fork''spawn''forkserver' . 仅在Windows上 'spawn' 是可用的。在UNIX上 'fork''spawn' 始终支持,与 'fork' 作为默认值。

3.4 新版功能.

multiprocessing.get_context(method=None)

返回与具有相同属性的上下文对象 multiprocessing 模块。

如果 方法None 然后返回默认上下文。否则 方法 应该是 'fork''spawn''forkserver' . ValueError 如果指定的Start方法不可用,则引发。

3.4 新版功能.

multiprocessing.get_start_method(allow_none=False)

返回用于启动进程的Start方法的名称。

如果启动方法尚未修复,并且 allow_none 如果为false,则start方法将固定为默认值,并返回名称。如果启动方法尚未修复,并且 allow_none 那么是真的 None 返回。

返回值可以是 'fork''spawn''forkserver'None . 'fork' 是Unix上的默认值,而 'spawn' 是Windows上的默认设置。

3.4 新版功能.

multiprocessing.set_executable()

设置启动子进程时要使用的python解释器的路径。(默认情况下) sys.executable 使用)。嵌入程序可能需要执行以下操作:

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

在创建子进程之前。

在 3.4 版更改: 现在在Unix上支持 'spawn' 使用Start方法。

multiprocessing.set_start_method(method)

设置用于启动子进程的方法。 方法 可以是 'fork''spawn''forkserver' .

请注意,最多应调用一次,并且应在 if __name__ == '__main__' 主模块的子句。

3.4 新版功能.

连接对象

连接对象允许发送和接收可选取对象或字符串。它们可以被认为是面向消息的连接套接字。

连接对象通常是使用 Pipe --也见 听众和客户 .

class multiprocessing.connection.Connection
send(obj)

将对象发送到连接的另一端,使用 recv() .

对象必须可拾取。非常大的腌菜(大约32 mib+,尽管这取决于操作系统)可能会导致 ValueError 例外。

recv()

返回从连接的另一端发送的对象,方法是 send() . 直到有东西要接收。引发 EOFError 如果没有东西可以接收,另一端就关闭了。

fileno()

返回连接使用的文件描述符或句柄。

close()

关闭连接。

当连接被垃圾收集时,将自动调用此函数。

poll([timeout])

返回是否有可读取的数据。

如果 timeout 如果未指定,则它将立即返回。如果 timeout 是一个数字,然后指定要阻止的最长时间(秒)。如果 timeoutNone 然后使用无限超时。

请注意,可以使用 multiprocessing.connection.wait() .

send_bytes(buffer[, offset[, size]])

从发送字节数据 bytes-like object 作为完整的信息。

如果 抵消 然后从该位置读取数据 缓冲区 . 如果 size 则从缓冲区中读取许多字节。非常大的缓冲区(约32 mib+,但取决于操作系统)可能会引发 ValueError 例外

recv_bytes([maxlength])

以字符串形式返回从连接另一端发送的字节数据的完整消息。直到有东西要接收。引发 EOFError 如果没有东西可以接收,另一端已经关闭。

如果 最大长度 已指定,消息长度超过 最大长度 然后 OSError 引发,连接将不再可读。

在 3.3 版更改: 此函数用于引发 IOError ,现在是的别名 OSError .

recv_bytes_into(buffer[, offset])

读入 缓冲区 从连接的另一端发送的字节数据的完整消息,并返回消息中的字节数。直到有东西要接收。引发 EOFError 如果没有东西可以接收,另一端就关闭了。

缓冲区 必须是可写的 bytes-like object . 如果 抵消 则消息将从该位置写入缓冲区。偏移量必须是小于的长度的非负整数 缓冲区 (以字节为单位)。

如果缓冲区太短,则 BufferTooShort 引发异常,完整消息可用为 e.args[0] 在哪里? e 是异常实例。

在 3.3 版更改: 连接对象本身现在可以在进程之间使用 Connection.send()Connection.recv() .

3.3 新版功能: 连接对象现在支持上下文管理协议——请参见 上下文管理器类型 . __enter__() 返回连接对象,以及 __exit__() 调用 close() .

例如:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

这个 Connection.recv() 方法会自动取消拾取它接收的数据,这可能会带来安全风险,除非您可以信任发送消息的进程。

因此,除非连接对象是使用 Pipe() 你应该只使用 recv()send() 方法。见 身份验证密钥 .

警告

如果进程在试图读取或写入管道时被终止,那么管道中的数据可能会损坏,因为可能无法确定消息边界在哪里。

同步原语

通常,同步原语在多进程程序中不像在多线程程序中那样必要。参见文档 threading 模块。

注意,还可以使用管理器对象创建同步原语——请参见 经理 .

class multiprocessing.Barrier(parties[, action[, timeout]])

屏障对象:的复制 threading.Barrier .

3.3 新版功能.

class multiprocessing.BoundedSemaphore([value])

有界信号量对象:对 threading.BoundedSemaphore .

它与同类产品的唯一区别在于: acquire 方法的第一个参数命名为 ,与 Lock.acquire() .

注解

在Mac OS X上,这与 Semaphore 因为 sem_getvalue() 未在该平台上实现。

class multiprocessing.Condition([lock])

条件变量:的别名 threading.Condition .

如果 lock 是指定的,那么它应该是 LockRLock 对象从 multiprocessing .

在 3.3 版更改: 这个 wait_for() 方法已添加。

class multiprocessing.Event

一个复制 threading.Event .

class multiprocessing.Lock

非递归锁对象:对 threading.Lock . 一旦一个进程或线程获得了一个锁,随后从任何进程或线程获取它的尝试将被阻塞,直到释放它;任何进程或线程都可能释放它。的概念和行为 threading.Lock 当它应用于线程时,在此处复制 multiprocessing.Lock 因为它适用于进程或线程,除非另有说明。

注意 Lock 实际上是一个factory函数,它返回 multiprocessing.synchronize.Lock 用默认上下文初始化。

Lock 支持 context manager 协议,因此可用于 with 声明。

acquire(block=True, timeout=None)

获取一个锁、阻塞或非阻塞。

参数设置为 True (默认),方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为已锁定并返回 True . 注意,第一个参数的名称与 threading.Lock.acquire() .

参数设置为 False ,方法调用不阻塞。如果锁当前处于锁定状态,则返回 False ;否则将锁设置为锁定状态并返回 True .

当使用正的浮点值调用时, timeout ,最多阻止指定的秒数 timeout 只要无法获取锁。具有负值的调用 timeout 相当于 timeout 为零。调用 timeout 价值 None (默认设置)将超时时间设置为无限。注意治疗阴性或 None 值为 timeout 不同于 threading.Lock.acquire() . 这个 timeout 如果 参数设置为 False 因此被忽略。返回 True 如果已获取锁,或 False 如果超时时间已过。

release()

释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。

行为与 threading.Lock.release() 但当在未锁定的锁上调用时, ValueError 提高了。

class multiprocessing.RLock

递归锁对象:对 threading.RLock . 获取递归锁的进程或线程必须释放它。一旦一个进程或线程获得了递归锁,同一进程或线程就可以在不阻塞的情况下再次获取它;该进程或线程必须在每次获取它时释放一次。

注意 RLock 实际上是一个factory函数,它返回 multiprocessing.synchronize.RLock 用默认上下文初始化。

RLock 支持 context manager 协议,因此可用于 with 声明。

acquire(block=True, timeout=None)

获取一个锁、阻塞或非阻塞。

当使用 参数设置为 True ,阻塞,直到锁处于未锁定状态(不属于任何进程或线程),除非该锁已属于当前进程或线程。然后,当前进程或线程取得锁的所有权(如果它还没有所有权),锁内的递归级别将增加一个,从而导致返回值为 True .注意,第一个参数的行为与 threading.RLock.acquire() ,从参数本身的名称开始。

当使用 参数设置为 False ,不要阻塞。如果锁已经被另一个进程或线程获取(因此拥有),则当前进程或线程不拥有所有权,并且锁内的递归级别也不会更改,从而导致返回值为 False . 如果锁处于未锁定状态,则当前进程或线程将取得所有权,并且递归级别将增加,从而导致返回值为 True .

使用和行为 timeout 参数与中的相同 Lock.acquire() . 注意,其中一些行为 timeout 不同于 threading.RLock.acquire() .

release()

释放一个锁,减少递归级别。如果递减后递归级别为零,则将锁重置为未锁定(不属于任何进程或线程),如果阻止任何其他进程或线程等待锁解锁,则只允许其中一个进程或线程继续。如果递减后递归级别仍然为非零,则锁将保持锁定,并由调用进程或线程拥有。

只有在调用进程或线程拥有锁时才调用此方法。安 AssertionError 如果此方法由所有者以外的进程或线程调用,或者锁处于未锁定(无主)状态,则引发。请注意,在这种情况下引发的异常类型与 threading.RLock.release() .

class multiprocessing.Semaphore([value])

信号量对象:对 threading.Semaphore .

它与同类产品的唯一区别在于: acquire 方法的第一个参数命名为 ,与 Lock.acquire() .

注解

在Mac OS X上, sem_timedwait 不支持,因此调用 acquire() 超时将使用休眠循环模拟该函数的行为。

注解

如果sigint信号由 Ctrl-C 当主线程被对的调用阻止时到达 BoundedSemaphore.acquire()Lock.acquire()RLock.acquire()Semaphore.acquire()Condition.acquire()Condition.wait() 然后调用将立即中断,并且 KeyboardInterrupt 将被引发。

这与 threading 其中sigint将在进行等效的阻塞调用时被忽略。

注解

这个包的某些功能需要在主机操作系统上实现一个正常工作的共享信号量。没有一个, multiprocessing.synchronize 模块将被禁用,并且尝试导入它将导致 ImportError . 见 bpo-3770 更多信息。

共享 ctypes 物体

可以使用可由子进程继承的共享内存创建共享对象。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回A ctypes 从共享内存分配的对象。默认情况下,返回值实际上是对象的同步封装器。对象本身可以通过 value 的属性 Value .

typecode_or_type 确定返回对象的类型:它是CTypes类型或由 array 模块。 *args 传递给类型的构造函数。

如果 lockTrue (默认)然后创建一个新的递归锁对象以同步对值的访问。如果 lock 是一个 LockRLock 对象,该对象将用于同步对值的访问。如果 lockFalse 然后,对返回对象的访问不会被锁自动保护,因此它不一定是“过程安全的”。

像这样的操作 += 涉及读写的不是原子的。因此,例如,如果您想要原子地增加一个共享值,那么仅仅这样做是不够的:

counter.value += 1

假设关联的锁是递归的(默认情况下是递归的),则可以执行以下操作:

with counter.get_lock():
    counter.value += 1

注意 lock 是只包含关键字的参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回从共享内存分配的CTypes数组。默认情况下,返回值实际上是数组的同步封装器。

typecode_or_type 确定返回数组元素的类型:它是CTypes类型或由 array 模块。如果 size_or_initializer 是一个整数,然后它确定数组的长度,数组将最初归零。否则, size_or_initializer 用于初始化数组的序列,其长度决定数组的长度。

如果 lockTrue (默认)然后创建一个新的锁对象,以同步对值的访问。如果 lock 是一个 LockRLock 对象,该对象将用于同步对值的访问。如果 lockFalse 然后,对返回对象的访问不会被锁自动保护,因此它不一定是“过程安全的”。

注意 lock 是只包含关键字的参数。

注意,一个数组 ctypes.c_charvalueraw 允许使用它存储和检索字符串的属性。

这个 multiprocessing.sharedctypes 模块

这个 multiprocessing.sharedctypes 模块提供分配功能 ctypes 从共享内存中可以由子进程继承的对象。

注解

尽管可以将指针存储在共享内存中,但请记住,这将引用特定进程地址空间中的一个位置。但是,指针很可能在第二个进程的上下文中无效,试图从第二个进程中取消对指针的引用可能会导致崩溃。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回从共享内存分配的CTypes数组。

typecode_or_type 确定返回数组元素的类型:它是CTypes类型或由 array 模块。如果 size_or_initializer 是一个整数,然后它确定数组的长度,数组最初将归零。否则 size_or_initializer 用于初始化数组的序列,其长度决定数组的长度。

注意,设置和获取元素可能是非原子的——使用 Array() 而是确保使用锁自动同步访问。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回从共享内存分配的CTypes对象。

typecode_or_type 确定返回对象的类型:它是CTypes类型或由 array 模块。 *args 传递给类型的构造函数。

注意,设置和获取值可能是非原子的——使用 Value() 而是确保使用锁自动同步访问。

注意,一个数组 ctypes.c_charvalueraw 允许使用它存储和检索字符串的属性——请参见文档 ctypes .

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

一样 RawArray() 但这取决于 lock 可以返回进程安全的同步封装,而不是原始CTypes数组。

如果 lockTrue (默认)然后创建一个新的锁对象,以同步对值的访问。如果 lock 是一个 LockRLock 对象,该对象将用于同步对值的访问。如果 lockFalse 然后,对返回对象的访问不会被锁自动保护,因此它不一定是“过程安全的”。

注意 lock 是只包含关键字的参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

一样 RawValue() 但这取决于 lock 可以返回进程安全的同步封装,而不是原始CTypes对象。

如果 lockTrue (默认)然后创建一个新的锁对象,以同步对值的访问。如果 lock 是一个 LockRLock 对象,该对象将用于同步对值的访问。如果 lockFalse 然后,对返回对象的访问不会被锁自动保护,因此它不一定是“过程安全的”。

注意 lock 是只包含关键字的参数。

multiprocessing.sharedctypes.copy(obj)

返回从共享内存分配的CTypes对象,该共享内存是CTypes对象的副本 obj .

multiprocessing.sharedctypes.synchronized(obj[, lock])

为使用 lock 同步访问。如果 lockNone (默认)然后 multiprocessing.RLock 对象是自动创建的。

除了所封装的对象的方法外,同步封装器还有两种方法: get_obj() 返回封装的对象并 get_lock() 返回用于同步的锁定对象。

请注意,通过封装器访问CTypes对象可能比访问原始CTypes对象慢得多。

在 3.5 版更改: 同步对象支持 context manager 协议。

下表比较了从共享内存创建共享CTypes对象的语法与普通CTypes语法。(表中) MyStructctypes.Structure

C型

sharedtypes使用类型

使用类型代码的SharedTypes

c_double(2.4)

RawValue(c_double, 2.4)

RawValue('d', 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray('h', 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray('i', (9, 2, 8))

下面是一个示例,其中许多ctypes对象由子进程修改:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

打印结果为:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

经理

管理器提供了一种创建可以在不同进程之间共享的数据的方法,包括在不同计算机上运行的进程之间通过网络共享数据。管理器对象控制管理 共享对象 . 其他进程可以使用代理访问共享对象。

multiprocessing.Manager()

返回已开始的 SyncManager 对象,可用于在进程之间共享对象。返回的管理器对象与生成的子进程相对应,并且具有创建共享对象并返回相应代理的方法。

一旦管理器进程被垃圾收集或其父进程退出,它们将立即关闭。管理器类在 multiprocessing.managers 模块:

class multiprocessing.managers.BaseManager([address[, authkey]])

创建一个baseManager对象。

一旦创建,应该调用 start()get_server().serve_forever() 以确保管理器对象引用已启动的管理器进程。

地址 是管理器进程侦听新连接的地址。如果 地址None 然后选择一个任意的。

身份验证密钥 是用于检查服务器进程的传入连接的有效性的身份验证密钥。如果 身份验证密钥None 然后 current_process().authkey 使用。否则 身份验证密钥 必须是字节字符串。

start([initializer[, initargs]])

启动子流程以启动管理器。如果 初始化器 不是 None 然后子进程将调用 initializer(*initargs) 当它开始的时候。

get_server()

返回A Server 对象,表示由管理器控制的实际服务器。这个 Server 对象支持 serve_forever() 方法:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 另外还有一个 address 属性。

connect()

将本地管理器对象连接到远程管理器进程::

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

停止管理器使用的进程。只有在 start() 已用于启动服务器进程。

这可以多次调用。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

可用于向Manager类注册类型或可调用的ClassMethod。

打字 是“类型标识符”,用于标识特定类型的共享对象。这必须是一个字符串。

可赎回的 可调用,用于为此类型标识符创建对象。如果管理器实例将使用 connect() 方法,或者如果 create_method 论证是 False 那么这个可以留作 None .

脯氨酸型 是的子类 BaseProxy 用于为此共享对象创建代理 打字 . 如果 None 然后会自动创建一个代理类。

暴露的 用于指定方法名的序列,应允许此typeid的代理使用 BaseProxy._callmethod() . (如果 暴露的None 然后 proxytype._exposed_ 如果存在,则使用。)如果未指定公开列表,则共享对象的所有“公共方法”都将可访问。(此处,“公共方法”是指具有 __call__() 方法,其名称不以开头 '_'

method_to_typeid 是一个映射,用于指定那些应返回代理的公开方法的返回类型。它将方法名映射到typeid字符串。(如果 method_to_typeidNone 然后 proxytype._method_to_typeid_ 如果方法的名称不是此映射的键,或者如果映射是 None 然后,该方法返回的对象将按值进行复制。

create_method 确定是否应使用名称创建方法 打字 它可以用来告诉服务器进程创建一个新的共享对象并返回其代理。默认情况下是 True .

BaseManager 实例还具有一个只读属性:

address

经理使用的地址。

在 3.3 版更改: 管理器对象支持上下文管理协议——请参见 上下文管理器类型 . __enter__() 启动服务器进程(如果尚未启动),然后返回Manager对象。 __exit__() 调用 shutdown() .

在以前的版本中 __enter__() 未启动管理器的服务器进程(如果尚未启动)。

class multiprocessing.managers.SyncManager

一个子类 BaseManager 可用于进程同步。此类型的对象由返回 multiprocessing.Manager() .

其方法创建和返回 代理对象 用于跨进程同步的许多常用数据类型。这尤其包括共享列表和字典。

Barrier(parties[, action[, timeout]])

创建共享 threading.Barrier 对象并返回其代理。

3.3 新版功能.

BoundedSemaphore([value])

创建共享 threading.BoundedSemaphore 对象并返回其代理。

Condition([lock])

创建共享 threading.Condition 对象并返回其代理。

如果 lock 则它应该是 threading.Lockthreading.RLock 对象。

在 3.3 版更改: 这个 wait_for() 方法已添加。

Event()

创建共享 threading.Event 对象并返回其代理。

Lock()

创建共享 threading.Lock 对象并返回其代理。

Namespace()

创建共享 Namespace 对象并返回其代理。

Queue([maxsize])

创建共享 queue.Queue 对象并返回其代理。

RLock()

创建共享 threading.RLock 对象并返回其代理。

Semaphore([value])

创建共享 threading.Semaphore 对象并返回其代理。

Array(typecode, sequence)

创建一个数组并返回它的代理。

Value(typecode, value)

使用可写的 value 属性并返回其代理。

dict()
dict(mapping)
dict(sequence)

创建共享 dict 对象并返回其代理。

list()
list(sequence)

创建共享 list 对象并返回其代理。

在 3.6 版更改: 共享对象可以嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager .

class multiprocessing.managers.Namespace

可以注册的类型 SyncManager .

命名空间对象没有公共方法,但具有可写属性。它的表示形式显示了其属性的值。

但是,当为命名空间对象使用代理时,以 '_' 将是代理的属性,而不是引用的属性:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

定制经理

要创建自己的管理器,可以创建 BaseManager 并使用 register() 用于向Manager类注册新类型或可调用项的ClassMethod。例如::

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

使用远程管理器

可以在一台计算机上运行管理服务器,并让客户机从其他计算机上使用它(假定涉及的防火墙允许)。

运行以下命令将为远程客户端可以访问的单个共享队列创建服务器:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以访问服务器,如下所示:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用它:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,使用客户端上的上述代码远程访问该队列:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

代理对象

代理是一个对象, 一个共同的对象,它(大概)生活在一个不同的过程中。共享对象被称为 参照物 代理的。多个代理对象可以具有相同的引用。

代理对象具有调用其引用的相应方法的方法(尽管并非引用的每个方法都必须通过代理可用)。这样,代理可以像其引用一样使用:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

注意,申请 str() 委托人将返回被委托人的陈述,而 repr() 将返回代理的表示。

代理对象的一个重要特性是它们是可拾取的,因此可以在进程之间传递。因此,引用可以包含 代理对象 . 这允许嵌套这些托管列表、dict和其他 代理对象

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

类似地,dict和list代理可以相互嵌套:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

如果标准(非代理) listdict 对象包含在引用中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道何时修改其中包含的值。但是,在容器代理中存储值(这会触发 __setitem__ 在代理对象上)不会通过管理器传播,因此为了有效地修改此类项,可以将修改后的值重新分配给容器代理:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

这种方法可能比使用嵌套方法不方便 代理对象 对于大多数用例,还演示了对同步的控制级别。

注解

代理输入 multiprocessing 不支持按值进行比较。例如,我们有:

>>> manager.list([1,2,3]) == [1,2,3]
False

在进行比较时,应该只使用引用的副本。

class multiprocessing.managers.BaseProxy

代理对象是 BaseProxy .

_callmethod(methodname[, args[, kwds]])

调用并返回代理引用方法的结果。

如果 proxy 是其引用为 obj 然后表达式:

proxy._callmethod(methodname, args, kwds)

将计算表达式::

getattr(obj, methodname)(*args, **kwds)

在经理的过程中。

返回的值将是调用结果的副本或新共享对象的代理——请参见 method_to_typeid 的参数 BaseManager.register() .

如果调用引发异常,则由 _callmethod() .如果在管理器进程中引发其他异常,则此异常将转换为 RemoteError 异常,由引发 _callmethod() .

请特别注意,如果 方法名称 还没有 暴露的 .

使用的示例 _callmethod()

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回引用文件的副本。

如果引用不可勾选,则会引发异常。

__repr__()

返回代理对象的表示形式。

__str__()

返回引用的表示形式。

清理

代理对象使用weakref回调,这样当它被垃圾收集时,它就会从拥有其引用的管理器中注销自己。

当不再有任何引用共享对象的代理时,共享对象将从管理器进程中删除。

进程池

您可以创建一个进程池,该池将执行提交给它的任务 Pool 类。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个进程池对象,它控制可向其提交任务的工作进程池。它支持带超时和回调的异步结果,并具有并行映射实现。

过程 是要使用的工作进程数。如果 过程None 然后返回的号码 os.cpu_count() 使用。

如果 初始化器 不是 None 然后每个工作进程将调用 initializer(*initargs) 当它开始的时候。

子级最大任务数 是指工作进程在退出并替换为新的工作进程之前可以完成的任务数,以便释放未使用的资源。默认值 子级最大任务数None 这意味着工作进程将和池一样长。

context 可用于指定用于启动工作进程的上下文。通常使用函数创建池 multiprocessing.Pool()Pool() 上下文对象的方法。在两种情况下 context 设置正确。

注意,pool对象的方法只能由创建pool的进程调用。

警告

multiprocessing.pool 对象具有需要通过将池用作上下文管理器或通过调用 close()terminate() 手动操作。如果不这样做,可能会导致进程在完成时挂起。

注意它是 不正确 依赖垃圾收集器销毁池,因为CPython不能确保调用池的终结器(请参见 object.__del__() 更多信息)。

3.2 新版功能: 子级最大任务数

3.4 新版功能: context

注解

a中的工作进程 Pool 通常在池工作队列的整个持续时间内活动。在其他系统(如apache、mod wsgi等)中发现的一种常见模式是,允许池中的工作人员在退出、清理和生成新进程以替换旧进程之前仅完成一定数量的工作。这个 子级最大任务数 参数 Pool 向最终用户公开此功能。

apply(func[, args[, kwds]])

调用 func 带着参数 args 和关键字参数 kwds . 它会一直阻塞,直到结果准备就绪。考虑到这一块, apply_async() 更适合并行执行工作。此外, func 只在池中的一个工作人员中执行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

的变体 apply() 返回 AsyncResult 对象。

如果 回调 则它应该是接受单个参数的可调用文件。当结果准备好时 回调 应用于它,即除非调用失败,在这种情况下, error_callback 而是应用。

如果 error_callback 则它应该是接受单个参数的可调用文件。如果目标函数失败,则 error_callback 使用异常实例调用。

回调应该立即完成,否则处理结果的线程将被阻塞。

map(func, iterable[, chunksize])

平行等价于 map() 内置功能(仅支持一个 可迭代的 但是,对于多个iterables,请参见 starmap() ). 它会阻塞直到结果就绪。

此方法将ITerable剪切成若干块,作为单独的任务提交给流程池。这些块的(近似)大小可以通过设置 chunksize 为正整数。

请注意,它可能会导致非常长的iterables的高内存使用率。考虑使用 imap()imap_unordered() 显式 chunksize 提高效率的选项。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

的变体 map() 返回 AsyncResult 对象。

如果 回调 则它应该是接受单个参数的可调用文件。当结果准备好时 回调 应用于它,即除非调用失败,在这种情况下, error_callback 而是应用。

如果 error_callback 则它应该是接受单个参数的可调用文件。如果目标函数失败,则 error_callback 使用异常实例调用。

回调应该立即完成,否则处理结果的线程将被阻塞。

imap(func, iterable[, chunksize])

更懒惰的版本 map() .

这个 chunksize 参数与 map() 方法。对于非常长的iterables,使用大值 chunksize 可以使任务完成 much 比使用默认值更快 1 .

如果 chunksize1 然后 next() 由返回的迭代器的方法 imap() 方法具有可选的 timeout 参数: next(timeout) 将提高 multiprocessing.TimeoutError 如果结果不能在 timeout 秒。

imap_unordered(func, iterable[, chunksize])

一样 imap() 除了返回的迭代器结果的顺序应该被认为是任意的。(只有在只有一个工作进程时,订单才保证“正确”。)

starmap(func, iterable[, chunksize])

类似于 map() 除了 可迭代的 应该是作为参数解包的iterables。

因此,一个 可迭代的 属于 [(1,2), (3, 4)] 结果在 [func(1,2), func(3,4)] .

3.3 新版功能.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

结合 starmap()map_async() 重复 可迭代的 调用号码和调用号码 func 打开了iTerables。返回结果对象。

3.3 新版功能.

close()

阻止将更多任务提交到池。所有任务完成后,工作进程将退出。

terminate()

立即停止工作进程,而不完成未完成的工作。当池对象被垃圾收集时 terminate() 将立即调用。

join()

等待工作进程退出。必须调用 close()terminate() 使用前 join() .

3.3 新版功能: 池对象现在支持上下文管理协议——请参见 上下文管理器类型 . __enter__() 返回pool对象,以及 __exit__() 调用 terminate() .

class multiprocessing.pool.AsyncResult

返回的结果的类 Pool.apply_async()Pool.map_async() .

get([timeout])

返回结果。如果 timeout 不是 None 结果不在 timeout 然后秒 multiprocessing.TimeoutError 提高了。如果远程调用引发异常,则该异常将由 get() .

wait([timeout])

等待结果可用或直到 timeout 秒通过。

ready()

返回调用是否已完成。

successful()

返回呼叫是否已完成,但未引发异常。将提高 ValueError 如果结果还没有准备好。

在 3.7 版更改: 如果结果还没有准备好, ValueError 被提升而不是 AssertionError .

下面的示例演示池的使用:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

听众和客户

通常,进程之间的消息传递是使用队列或 Connection 返回的对象 Pipe() .

然而, multiprocessing.connection 模块允许一些额外的灵活性。它基本上提供了一个用于处理套接字或窗口命名管道的高级面向消息的API。它还支持 摘要式身份验证 使用 hmac 模块,用于同时轮询多个连接。

multiprocessing.connection.deliver_challenge(connection, authkey)

将随机生成的消息发送到连接的另一端,然后等待答复。

如果回复与消息摘要匹配,则使用 身份验证密钥 作为密钥,欢迎消息将发送到连接的另一端。否则 AuthenticationError 提高了。

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,使用计算消息摘要 身份验证密钥 作为键,然后将摘要发回。

如果未收到欢迎信息,则 AuthenticationError 提高了。

multiprocessing.connection.Client(address[, family[, authkey]])

尝试建立到使用地址的侦听器的连接 地址 返回一个 Connection .

连接类型由 家庭 参数,但这通常可以省略,因为它通常可以从 地址 . (见 地址格式

如果 身份验证密钥 是给定的而不是无的,它应该是一个字节字符串,并将用作基于HMAC的身份验证质询的密钥。如果 身份验证密钥 一个也没有。 AuthenticationError 在身份验证失败时引发。见 身份验证密钥 .

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

“正在侦听”连接的绑定套接字或Windows命名管道的封装。

地址 侦听器对象的绑定套接字或命名管道要使用的地址。

注解

如果使用地址“0.0.0.0”,则该地址将不是Windows上可连接的终结点。如果需要可连接的端点,则应使用“127.0.0.1”。

家庭 是要使用的套接字(或命名管道)的类型。这可以是字符串之一 'AF_INET' (对于TCP套接字) 'AF_UNIX' (对于UNIX域套接字)或 'AF_PIPE' (对于名为pipe的窗口)。其中只有第一个是保证可用的。如果 家庭None 然后根据 地址 . 如果 地址 也是 None 然后选择默认值。此默认值是假定为可用速度最快的族。见 地址格式 . 注意如果 家庭'AF_UNIX' 地址是 None 然后将在使用创建的专用临时目录中创建套接字 tempfile.mkstemp() .

如果侦听器对象使用套接字,则 积压 (默认为1)传递给 listen() 套接字绑定后的方法。

如果 身份验证密钥 是给定的而不是无的,它应该是一个字节字符串,并将用作基于HMAC的身份验证质询的密钥。如果 身份验证密钥 一个也没有。 AuthenticationError 在身份验证失败时引发。见 身份验证密钥 .

accept()

接受侦听器对象的绑定套接字或命名管道上的连接,并返回 Connection 对象。如果尝试身份验证但失败,则 AuthenticationError 提高了。

close()

关闭侦听器对象的绑定套接字或命名管道。当侦听器被垃圾收集时,将自动调用此函数。但是,最好明确地调用它。

侦听器对象具有以下只读属性:

address

侦听器对象正在使用的地址。

last_accepted

上次接受的连接来自的地址。如果这个不可用,那么它是 None .

3.3 新版功能: 侦听器对象现在支持上下文管理协议——请参见 上下文管理器类型 . __enter__() 返回Listener对象,以及 __exit__() 调用 close() .

multiprocessing.connection.wait(object_list, timeout=None)

等到有东西进来 object_list 准备好了。返回这些对象的列表 object_list 准备好了。如果 timeout 是一个浮点数,然后调用最多阻塞这么多秒。如果 timeoutNone 然后它将无限期阻塞。负超时等于零超时。

对于Unix和Windows,对象可以出现在 object_list 如果是

当可以从连接或套接字对象中读取数据,或者另一端已关闭时,连接或套接字对象已就绪。

Unixwait(object_list, timeout) 几乎相等 select.select(object_list, [], [], timeout) .区别在于,如果 select.select() 被信号中断,它可以上升 OSError 错误号为 EINTRwait() 不会。

Windows :中的项目 object_list 必须是可等待的整数句柄(根据win32函数文档使用的定义) WaitForMultipleObjects() )或者它可以是一个带有 fileno() 返回套接字句柄或管道句柄的方法。(请注意,管道手柄和Socket手柄 not 可等待的手柄。)

3.3 新版功能.

Examples

以下服务器代码创建使用 'secret password' 作为身份验证密钥。然后它等待连接并向客户机发送一些数据:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下代码连接到服务器并从服务器接收一些数据:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下代码使用 wait() 要同时等待来自多个进程的消息,请执行以下操作:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

地址格式

  • 'AF_INET' 地址是表单的元组 (hostname, port) 在哪里? 主机名 是一个字符串 port 是一个整数。

  • 'AF_UNIX' 地址是表示文件系统上文件名的字符串。

  • 一个 'AF_PIPE' 地址是以下形式的字符串 r'\.\pipe{PipeName}' 。要使用 Client() 要连接到名为的远程计算机上的命名管道,请执行以下操作 ServerName 您应该使用以下形式的地址 r'\{ServerName}\pipe{PipeName}' 取而代之的是。

注意,默认情况下,以两个反斜杠开头的任何字符串都假定为 'AF_PIPE' 地址而不是 'AF_UNIX' 地址。

身份验证密钥

当一个人使用 Connection.recv ,接收到的数据将自动取消勾选。不幸的是,从不受信任的源中提取数据存在安全风险。因此 ListenerClient() 使用 hmac 提供摘要式身份验证的模块。

身份验证密钥是一个字节字符串,可以认为是一个密码:一旦建立了连接,两端都需要证明对方知道身份验证密钥。(证明两端使用相同的键 not 包括通过连接发送密钥。)

如果请求身份验证但未指定身份验证密钥,则返回值为 current_process().authkey 使用(见) Process )此值将自动由任何 Process 当前进程创建的对象。这意味着(默认情况下)多进程程序的所有进程都将共享一个身份验证密钥,该密钥可以在它们之间建立连接时使用。

也可以通过使用 os.urandom() .

登录

提供了一些日志记录支持。但是,请注意, logging 包不使用进程共享锁,因此(取决于处理程序类型)可能会混淆来自不同进程的消息。

multiprocessing.get_logger()

返回使用的记录器 multiprocessing . 如有必要,将创建一个新的。

当第一次创建时,记录器具有级别 logging.NOTSET 没有默认的处理程序。默认情况下,发送到此记录器的消息不会传播到根记录器。

注意,在Windows上,子进程将只继承父进程记录器的级别——记录器的任何其他自定义都不会被继承。

multiprocessing.log_to_stderr()

此函数执行对 get_logger() 但是除了返回由get_logger创建的记录器之外,它还添加了一个将输出发送到 sys.stderr 使用格式 '[%(levelname)s/%(processName)s] %(message)s' .

下面是打开日志记录的示例会话:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有关日志记录级别的完整表,请参见 logging 模块。

这个 multiprocessing.dummy 模块

multiprocessing.dummy 复制的API multiprocessing 但仅仅是一个封装 threading 模块。

特别值得一提的是, Pool 由以下人员提供的功能 multiprocessing.dummy 返回 ThreadPool ,它是的子类 Pool 它支持所有相同的方法调用,但使用的是工作线程池,而不是工作进程。

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

控制可以向其提交作业的工作线程池的线程池对象。 ThreadPool 实例与完全兼容接口 Pool 实例,并且还必须正确管理它们的资源,方法是将池用作上下文管理器或调用 close()terminate() 手工操作。

流程 要使用的工作线程数。如果 流程None 则由返回的数字 os.cpu_count() 是使用的。

如果 初始化器 不是 None 然后每个工作进程将调用 initializer(*initargs) 当它开始的时候。

不像 PoolmaxtaskperChild上下文 无法提供。

注解

A ThreadPool 共享与相同的接口 Pool ,它是围绕进程池设计的,在引入 concurrent.futures 模块。因此,它继承了一些对于线程支持的池没有意义的操作,并且它有自己的类型来表示异步作业的状态, AsyncResult ,这是任何其他库都不能理解的。

用户通常应该更喜欢使用 concurrent.futures.ThreadPoolExecutor ,它有一个从一开始就围绕线程设计的更简单的界面,它返回 concurrent.futures.Future 实例,这些实例与许多其他库兼容,包括 asyncio

程序设计指南

在使用时应遵守某些准则和习惯用法。 multiprocessing .

所有启动方法

以下内容适用于所有启动方法。

避免共享状态

尽可能避免在进程之间转移大量数据。

最好还是坚持使用队列或管道在进程之间进行通信,而不是使用较低级别的同步原语。

可摘性

确保代理方法的参数是可拾取的。

代理的线程安全性

不要使用来自多个线程的代理对象,除非用锁保护它。

(使用 same 代理服务器)

加入僵尸进程

在Unix上,当进程完成但尚未加入时,它将变成僵尸。永远不应该有太多,因为每次一个新进程启动(或 active_children() 将联接所有尚未联接的已完成进程。同时调用已完成进程的 Process.is_alive 将加入进程。即使如此,显式地加入所有您开始的流程可能也是一个好的实践。

继承总比泡菜/松饼好。

当使用 产卵福克斯服务器 从许多类型开始方法 multiprocessing 需要可选择,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承该资源。

避免终止进程

使用 Process.terminate 停止进程的方法可能会导致该进程当前使用的任何共享资源(如锁、信号量、管道和队列)中断或对其他进程不可用。

因此,最好只考虑使用 Process.terminate 在从不使用任何共享资源的进程上。

连接使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲项目由“feeder”线程馈送到底层管道。(子进程可以调用 Queue.cancel_join_thread 用于避免此行为的队列方法。)

这意味着,无论何时使用队列,都需要确保在加入进程之前,已放入队列的所有项目最终都将被删除。否则,您无法确保已将项目放入队列的进程将终止。还请记住,非后台进程将自动加入。

死锁的例子如下:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的一个解决方法是交换最后两行(或者简单地删除 p.join() 线)。

显式地将资源传递给子进程

在Unix上使用 fork Start方法,子进程可以使用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与Windows和其他Start方法兼容之外,这还确保只要子进程仍然活动,就不会在父进程中对对象进行垃圾收集。如果在父进程中垃圾收集对象时释放某些资源,这可能很重要。

例如:

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

应重写为:

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

小心更换 sys.stdin 带有“类文件对象”

multiprocessing 最初无条件调用:

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法---导致过程中的过程出现问题。已将此更改为:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

它解决了进程相互碰撞导致文件描述符错误的基本问题,但对替换 sys.stdin() 带有“类似文件的对象”和输出缓冲。如果多个进程调用 close() 在这个类似文件的对象上,它可能导致同一数据多次刷新到该对象,从而导致损坏。

如果编写一个类似对象的文件并实现自己的缓存,则可以通过在附加到缓存时存储pid和在pid更改时丢弃缓存来确保它的安全性。例如::

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有关详细信息,请参阅 bpo-5155bpo-5313bpo-5331

这个 产卵福克斯服务器 启动方法

有一些额外的限制不适用于 fork 启动方法。

更挑剔

确保所有参数 Process.__init__() 是可以腌制的。此外,如果您将 Process 然后确保在 Process.start 方法被调用。

全局变量

记住,如果在子进程中运行的代码试图访问全局变量,那么它看到的值(如果有)可能与父进程中在 Process.start 被叫来。

但是,仅作为模块级常量的全局变量不会导致任何问题。

主模块安全导入

确保新的python解释器可以安全地导入主模块,而不会造成意外的副作用(例如启动新的进程)。

例如,使用 产卵福克斯服务器 运行以下模块的Start方法将失败 RuntimeError ::

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

相反,应该使用 if __name__ == '__main__': 如下:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(The freeze_support() 如果程序将正常运行而不是冻结,则可以省略行。)

这允许新生成的python解释器安全地导入模块,然后运行模块的 foo() 功能。

如果在主模块中创建了池或管理器,则应用类似的限制。

实例

演示如何创建和使用定制的管理器和代理:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

显示如何使用队列将任务馈送到工作进程集合并收集结果的示例:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()