multiprocessing.shared_memory ---为跨进程的直接访问提供共享内存

源代码: Lib/multiprocessing/shared_memory.py

3.8 新版功能.


此模块提供一个类, SharedMemory 用于多核或对称多处理器(SMP)计算机上的一个或多个进程访问的共享内存的分配和管理。为了帮助共享内存的生命周期管理,特别是在不同的进程中,一个 BaseManager 子类, SharedMemoryManager ,也在 multiprocessing.managers 模块。

在这个模块中,共享内存指的是“SystemV样式”的共享内存块(尽管不一定要这样明确地实现),而不是“分布式共享内存”。这种类型的共享内存允许不同的进程潜在地读写到易失性内存的公共(或共享)区域。通常情况下,进程只能访问自己的进程内存空间,但共享内存允许进程之间共享数据,从而避免了在包含该数据的进程之间发送消息的需要。与通过磁盘或套接字或其他需要数据序列化/反序列化和复制的通信共享数据相比,直接通过内存共享数据可以提供显著的性能优势。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)

创建新的共享内存块或附加到现有的共享内存块。每个共享内存块都被分配一个唯一的名称。通过这种方式,一个进程可以创建具有特定名称的共享内存块,而另一个进程可以使用相同的名称附加到同一个共享内存块。

作为跨进程共享数据的资源,共享内存块可能比创建它们的原始进程寿命更长。当一个进程不再需要访问其他进程可能仍然需要的共享内存块时, close() 应调用方法。当任何进程不再需要共享内存块时, unlink() 应调用方法以确保正确清理。

name 是请求的共享内存的唯一名称,指定为字符串。创建新的共享内存块时,如果 None (默认)为名称提供,将生成新名称。

创造 控制是否创建新的共享内存块 (True )或者附加了现有的共享内存块 (False

size 指定创建新共享内存块时请求的字节数。由于一些平台选择根据平台的内存页大小分配内存块,因此共享内存块的确切大小可能大于或等于请求的大小。当附加到现有共享内存块时, size 参数被忽略。

close()

关闭对此实例的共享内存的访问。为了确保正确清理资源,所有实例都应调用 close() 一旦不再需要实例。注意调用 close() 不会导致共享内存块本身被销毁。

请求销毁基础共享内存块。为了确保适当的资源清理, unlink() 应该在所有需要共享内存块的进程中调用一次(并且只调用一次)。请求销毁共享内存块后,可能会立即销毁共享内存块,也可能不会立即销毁共享内存块,并且此行为在不同平台上可能有所不同。之后尝试访问共享内存块内的数据 unlink() 已调用可能导致内存访问错误。注意:最后一个放弃对共享内存块的保留的进程可能会调用 unlink()close() 按任何顺序。

buf

共享内存块内容的内存视图。

name

对共享内存块唯一名称的只读访问。

size

只读访问共享内存块的字节大小。

下面的示例演示了 SharedMemory 实例::

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

下面的示例演示了 SharedMemory 上课用 NumPy arrays ,访问相同的 numpy.ndarray 从两个不同的python shell:

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

一个子类 BaseManager 它可以用于跨进程管理共享内存块。

调用 start() 在一 SharedMemoryManager 实例导致启动新进程。这个新进程的唯一目的是管理通过它创建的所有共享内存块的生命周期。要触发由该进程管理的所有共享内存块的释放,请调用 shutdown() 在实例上。这触发了 SharedMemory.unlink() 调用所有 SharedMemory 由该进程管理的对象,然后停止该进程本身。通过创建 SharedMemory 实例通过 SharedMemoryManager 我们避免了手动跟踪和触发释放共享内存资源的需要。

此类提供了创建和返回的方法 SharedMemory 实例和用于创建类似列表的对象 (ShareableList )由共享内存支持。

参照 multiprocessing.managers.BaseManager 对于继承的 地址身份验证密钥 可选输入参数以及如何使用它们连接到现有的 SharedMemoryManager 来自其他进程的服务。

SharedMemory(size)

创建并返回新的 SharedMemory 具有指定的 size 以字节为单位。

ShareableList(sequence)

创建并返回新的 ShareableList 对象,由输入的值初始化 sequence .

下面的示例演示了 SharedMemoryManager

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

下面的示例描述了一个可能更方便使用的模式 SharedMemoryManager 对象通过 with 语句以确保不再需要所有共享内存块后释放它们:

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

当使用 SharedMemoryManager 在一个 with 语句,使用该管理器创建的共享内存块在 with 语句的代码块完成执行。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

提供一个可变的类似列表的对象,其中存储的所有值都存储在共享内存块中。这将可存储值限制为 intfloatboolstr (每个小于10兆字节) bytes (每个小于10M字节),以及 None 内置数据类型。它也明显不同于内置的 list 键入这些列表不能更改它们的总长度(即没有追加、插入等),并且不支持动态创建新的 ShareableList 通过切片的实例。

序列 用于填充新的 ShareableList 充满价值。设置为 None 改为附加到已存在的 ShareableList 以其唯一的共享内存名。

name 是请求的共享内存的唯一名称,如的定义中所述 SharedMemory . 当附加到现有 ShareableList ,在离开时指定其共享内存块的唯一名称 sequence 设置为 None .

count(value)

返回出现的次数 value .

index(value)

返回的第一个索引位置 value .引发 ValueError 如果 value 不存在。

format

只读属性,包含 struct 当前存储的所有值使用的打包格式。

shm

这个 SharedMemory 存储值的实例。

下面的示例演示了 ShareableList 实例:

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

下面的示例描述了一个、两个或多个进程如何访问同一个进程 ShareableList 通过提供后面共享内存块的名称:

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()