Mpi4py

MPI是消息传递接口的缩写,是并行编程的通用库。有一个包裹 mpi4py 它构建在MPI之上,并允许在不同的进程之间传递任意的Python对象。Sage发行版不提供这些程序包。安装 openmpi 使用您的发行版的包管理器。然后安装 mpi4py 使用

sage: !pip install mpi4py

现在,MPI的工作方式是启动一组MPI进程,所有进程运行相同的代码。每个进程都有一个等级,这是一个标识它的数字。下面的伪代码表示MPI程序的通用格式。

   ....

if my rank is n:
   do some computation ...
   send some stuff to the process of rank j
   receive some data from the process of rank k

else if my rank is n+1:
   ....

每个进程查找它应该做的事情(由它的等级指定),并且进程可以发送和接收数据。让我们举一个例子。在文件中使用以下代码创建脚本 mpi_1.py

from mpi4py import MPI
comm = MPI.COMM_WORLD
print("hello world")
print(f"my rank is: {comm.rank}")

要运行它,您可以这样做(从Sage目录中的命令行)

mpirun -np 5 ./sage -python mpi_1.py

该命令 mpirun -np 5 在MPI下启动5个程序副本。在本例中,我们有5个Sage副本在纯Python模式下运行该脚本 mpi_1.py 。结果应该是5个“你好世界”加上5个不同的排名。

最重要的两个MPI操作是发送和接收。考虑以下应该放入脚本中的示例 mpi_2.py

from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
v = numpy.array([rank] * 5, dtype=float)
comm.send(v, dest=(rank+1) % size)
data = comm.recv(source=(rank-1) % size)
print(f"my rank is: {rank}")
print("I received this:")
print(data)

使用与上面相同的命令 mpi_1.py 替换为 mpi_2.py 将产生5个输出。每个进程将创建一个数组并将其传递给下一个进程,在下一个进程中,最后一个进程将传递给第一个进程。请注意 MPI.size 是MPI进程的总数。 MPI.COMM_WORLD 是沟通的世界。

关于MPI,有一些微妙之处需要注意。较小的发送被缓冲。这意味着,如果一个进程发送一个小对象,它将由OpenMPI存储,该进程将继续其执行,并且每当目的地执行接收时,它发送的对象都将被接收。但是,如果对象很大,进程将挂起,直到其目的地执行相应的接收。事实上,如果出现以下情况,上述代码将挂起 [rank]*5 被替换为 [rank]*500 。如果是这样做会更好

from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
v = numpy.array([rank] * 500, dtype=float)
if comm.rank == 0:
   comm.send(v, dest=(rank+1) % size)
if comm.rank > 0:
    data = comm.recv(source=(rank-1) % size)
    comm.send(v, dest=(rank+1) % size)
if comm.rank == 0:
    data = comm.recv(source=size - 1)

print(f"my rank is: {rank}")
print("I received this:")
print(data)

现在,进程0将数据发送到进程1,然后等待从进程接收数据 MPI.size - 1 。同时,进程1将数据发送到进程2,然后从进程0接收数据。即使传输的阵列很大,这也不会锁定。

一个常见的习语是让一个流程充当领导者,通常是排名为0的流程。该进程将数据发送到其他进程,根据结果进行计算,并决定应该进行多少进一步计算。请考虑以下代码

from mpi4py import MPI
import numpy
sendbuf = []
root = 0
comm = MPI.COMM_WORLD
if comm.rank == 0:
    m = numpy.random.randn(comm.size, comm.size)
    print(m)
    sendbuf=m

v = comm.scatter(sendbuf, root)

print("I got this array:")
print(v)

这个 scatter 命令获取一个列表,并将其平均分配给所有进程。在这里,根进程创建一个矩阵(被视为行列表),然后将其分配给每个人(根的 sendbuf 在进程之间平均分配)。每个进程都打印它得到的行。请注意, scatter 命令由每个人执行,但当超级用户执行它时,它充当 send 以及一个 receive (根从自身获得一行),而对于其他所有人来说,它只是一个 receive

这是一个互补的 gather 将所有进程的结果收集到列表中的命令。下一个示例使用 scattergather 在一起。现在,根进程分散了矩阵的各行。每个进程对其接收到的行的元素进行平方处理。然后,根进程将行收集到一个新的矩阵中。

from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
sendbuf = []
root = 0
if comm.rank == 0:
    m = numpy.array(range(comm.size * comm.size), dtype=float)
    m.shape = (comm.size, comm.size)
    print(m)
    sendbuf = m

v = comm.scatter(sendbuf, root)
print("I got this array:")
print(v)
v = v*v
recvbuf = comm.gather(v, root)
if comm.rank == 0:
    print(numpy.array(recvbuf))

也有一个 broadcast 向每个进程发送单个对象的命令。请考虑下面的小扩展。这与以前相同,但现在在结束时,根进程向每个人发送字符串“Done”,该字符串将被打印出来。

v = MPI.COMM_WORLD.scatter(sendbuf, root)
print("I got this array:")
print(v)
v = v*v
recvbuf = MPI.COMM_WORLD.gather(v, root)
if MPI.COMM_WORLD.rank == 0:
    print(numpy.array(recvbuf))

if MPI.COMM_WORLD.rank == 0:
    sendbuf = "done"
recvbuf = MPI.COMM_WORLD.bcast(sendbuf,root)
print(recvbuf)