Mpi4py¶
MPI是消息传递接口的缩写,是并行编程的通用库。有一个包裹 mpi4py
它构建在MPI之上,并允许在不同的进程之间传递任意的Python对象。Sage发行版不提供这些程序包。安装 openmpi
使用您的发行版的包管理器。然后安装 mpi4py
使用
sage: !pip install mpi4py
现在,MPI的工作方式是启动一组MPI进程,所有进程运行相同的代码。每个进程都有一个等级,这是一个标识它的数字。下面的伪代码表示MPI程序的通用格式。
....
if my rank is n:
do some computation ...
send some stuff to the process of rank j
receive some data from the process of rank k
else if my rank is n+1:
....
每个进程查找它应该做的事情(由它的等级指定),并且进程可以发送和接收数据。让我们举一个例子。在文件中使用以下代码创建脚本 mpi_1.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
print("hello world")
print(f"my rank is: {comm.rank}")
要运行它,您可以这样做(从Sage目录中的命令行)
mpirun -np 5 ./sage -python mpi_1.py
该命令 mpirun -np 5
在MPI下启动5个程序副本。在本例中,我们有5个Sage副本在纯Python模式下运行该脚本 mpi_1.py
。结果应该是5个“你好世界”加上5个不同的排名。
最重要的两个MPI操作是发送和接收。考虑以下应该放入脚本中的示例 mpi_2.py
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
v = numpy.array([rank] * 5, dtype=float)
comm.send(v, dest=(rank+1) % size)
data = comm.recv(source=(rank-1) % size)
print(f"my rank is: {rank}")
print("I received this:")
print(data)
使用与上面相同的命令 mpi_1.py
替换为 mpi_2.py
将产生5个输出。每个进程将创建一个数组并将其传递给下一个进程,在下一个进程中,最后一个进程将传递给第一个进程。请注意 MPI.size
是MPI进程的总数。 MPI.COMM_WORLD
是沟通的世界。
关于MPI,有一些微妙之处需要注意。较小的发送被缓冲。这意味着,如果一个进程发送一个小对象,它将由OpenMPI存储,该进程将继续其执行,并且每当目的地执行接收时,它发送的对象都将被接收。但是,如果对象很大,进程将挂起,直到其目的地执行相应的接收。事实上,如果出现以下情况,上述代码将挂起 [rank]*5
被替换为 [rank]*500
。如果是这样做会更好
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
v = numpy.array([rank] * 500, dtype=float)
if comm.rank == 0:
comm.send(v, dest=(rank+1) % size)
if comm.rank > 0:
data = comm.recv(source=(rank-1) % size)
comm.send(v, dest=(rank+1) % size)
if comm.rank == 0:
data = comm.recv(source=size - 1)
print(f"my rank is: {rank}")
print("I received this:")
print(data)
现在,进程0将数据发送到进程1,然后等待从进程接收数据 MPI.size - 1
。同时,进程1将数据发送到进程2,然后从进程0接收数据。即使传输的阵列很大,这也不会锁定。
一个常见的习语是让一个流程充当领导者,通常是排名为0的流程。该进程将数据发送到其他进程,根据结果进行计算,并决定应该进行多少进一步计算。请考虑以下代码
from mpi4py import MPI
import numpy
sendbuf = []
root = 0
comm = MPI.COMM_WORLD
if comm.rank == 0:
m = numpy.random.randn(comm.size, comm.size)
print(m)
sendbuf=m
v = comm.scatter(sendbuf, root)
print("I got this array:")
print(v)
这个 scatter
命令获取一个列表,并将其平均分配给所有进程。在这里,根进程创建一个矩阵(被视为行列表),然后将其分配给每个人(根的 sendbuf
在进程之间平均分配)。每个进程都打印它得到的行。请注意, scatter
命令由每个人执行,但当超级用户执行它时,它充当 send
以及一个 receive
(根从自身获得一行),而对于其他所有人来说,它只是一个 receive
。
这是一个互补的 gather
将所有进程的结果收集到列表中的命令。下一个示例使用 scatter
和 gather
在一起。现在,根进程分散了矩阵的各行。每个进程对其接收到的行的元素进行平方处理。然后,根进程将行收集到一个新的矩阵中。
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
sendbuf = []
root = 0
if comm.rank == 0:
m = numpy.array(range(comm.size * comm.size), dtype=float)
m.shape = (comm.size, comm.size)
print(m)
sendbuf = m
v = comm.scatter(sendbuf, root)
print("I got this array:")
print(v)
v = v*v
recvbuf = comm.gather(v, root)
if comm.rank == 0:
print(numpy.array(recvbuf))
也有一个 broadcast
向每个进程发送单个对象的命令。请考虑下面的小扩展。这与以前相同,但现在在结束时,根进程向每个人发送字符串“Done”,该字符串将被打印出来。
v = MPI.COMM_WORLD.scatter(sendbuf, root)
print("I got this array:")
print(v)
v = v*v
recvbuf = MPI.COMM_WORLD.gather(v, root)
if MPI.COMM_WORLD.rank == 0:
print(numpy.array(recvbuf))
if MPI.COMM_WORLD.rank == 0:
sendbuf = "done"
recvbuf = MPI.COMM_WORLD.bcast(sendbuf,root)
print(recvbuf)