并行拉普拉斯解算器¶
下面的代码在网格上并行求解拉普拉斯方程。它把一个正方形分成 \(n\) 平行条,其中 \(n\) 是进程的数目,并使用jacobi迭代。代码的工作方式是根进程创建一个矩阵,并将这些部分分发给其他进程。在每次迭代中,每个进程将其上行传递给上面的进程,将其下行传递给下面的进程,因为它们需要知道相邻点的值来进行迭代。然后他们反复重复。每500次迭代,使用Gather收集过程的误差估计值。您可以将这个结果与我们在f2py一节中编写的解算器进行比较。
from mpi4py import MPI
import numpy
size=MPI.size
rank=MPI.rank
num_points=500
sendbuf=[]
root=0
dx=1.0/(num_points-1)
from numpy import r_
j=numpy.complex(0,1)
rows_per_process=num_points/size
max_iter=5000
num_iter=0
total_err=1
def numpyTimeStep(u,dx,dy):
dx2, dy2 = dx**2, dy**2
dnr_inv = 0.5/(dx2 + dy2)
u_old=u.copy()
# The actual iteration
u[1:-1, 1:-1] = ((u[0:-2, 1:-1] + u[2:, 1:-1])*dy2 +
(u[1:-1,0:-2] + u[1:-1, 2:])*dx2)*dnr_inv
v = (u - u_old).flat
return u,numpy.sqrt(numpy.dot(v,v))
if MPI.rank==0:
print("num_points: %d"%num_points)
print("dx: %f"%dx)
print("row_per_procss: %d"%rows_per_process)
m=numpy.zeros((num_points,num_points),dtype=float)
pi_c=numpy.pi
x=r_[0.0:pi_c:num_points*j]
m[0,:]=numpy.sin(x)
m[num_points-1,:]=numpy.sin(x)
l=[ m[i*rows_per_process:(i+1)*rows_per_process,:] for i in range(size)]
sendbuf=l
my_grid=MPI.COMM_WORLD.Scatter(sendbuf,root)
while num_iter < max_iter and total_err > 10e-6:
if rank==0:
MPI.COMM_WORLD.Send(my_grid[-1,:],1)
if rank > 0 and rank< size-1:
row_above=MPI.COMM_WORLD.Recv(rank-1)
MPI.COMM_WORLD.Send(my_grid[-1,:],rank+1)
if rank==size-1:
row_above=MPI.COMM_WORLD.Recv(MPI.rank-1)
MPI.COMM_WORLD.Send(my_grid[0,:],rank-1)
if rank > 0 and rank< size-1:
row_below=MPI.COMM_WORLD.Recv(MPI.rank+1)
MPI.COMM_WORLD.Send(my_grid[0,:],MPI.rank-1)
if rank==0:
row_below=MPI.COMM_WORLD.Recv(1)
if rank >0 and rank < size-1:
row_below.shape=(1,num_points)
row_above.shape=(1,num_points)
u,err =numpyTimeStep(r_[row_above,my_grid,row_below],dx,dx)
my_grid=u[1:-1,:]
if rank==0:
row_below.shape=(1,num_points)
u,err=numpyTimeStep(r_[my_grid,row_below],dx,dx)
my_grid=u[0:-1,:]
if rank==size-1:
row_above.shape=(1,num_points)
u,err=numpyTimeStep(r_[row_above,my_grid],dx,dx)
my_grid=u[1:,:]
if num_iter%500==0:
err_list=MPI.COMM_WORLD.Gather(err,root)
if rank==0:
total_err = 0
for a in err_list:
total_err=total_err+numpy.math.sqrt( a**2)
total_err=numpy.math.sqrt(total_err)
print("error: %f"%total_err)
num_iter=num_iter+1
recvbuf=MPI.COMM_WORLD.Gather(my_grid,root)
if rank==0:
sol=numpy.array(recvbuf)
sol.shape=(num_points,num_points)
##Write your own code to do something with the solution
print(num_iter)
print(sol)
对于较小的网格大小,这将比直接的串行实现慢,这是因为通信有开销,而对于小网格,进程间通信比仅仅进行迭代要花费更多的时间。然而,在1000x1000的网格上,我发现使用4个处理器,并行版本只需6秒,而我们之前编写的串行版本只需20秒。
练习:用f2py重写上面的代码,使每个进程都编译一个fortran函数并使用它,你能以多快的速度得到这个函数?