Python中MPI訊息傳遞介面
阿新 • • 發佈:2019-01-30
MPI(Message-Passing-Interface)訊息傳遞介面
1.MPI安裝
python mpi安裝mpi4py的python庫
pip install mpi4py
安裝後得到Bin資料夾:
將MicrosoftMPI/Bin資料夾路徑新增到環境變數:
命令列輸入mpiexec執行,出現幫助表示安裝成功
2.試驗程式碼
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
print 'I'm the %d process of %d processes" % (comm_rank, comm_size)
2.點對點傳輸
點對點通訊.其實就是最簡單的程序A向程序B傳送資訊,而程序B向程序A接收資訊.這是關於兩個程序之間的通訊.
示例程式碼:
#mpip2p.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
data = [comm_rank]*5
comm.send(data,dest=(comm_rank+1 )%comm_size)
data_recv =comm.recv(source=(comm_rank-1)%comm_size)
print "my rank is %d, and Ireceived:" %comm_rank
print data_recv
在命令列中輸入命令
mpiexec -n 5 python mpip2p.py
執行結果:
my rank is 4, and Ireceived:
[3, 3, 3, 3, 3]
my rank is 3, and Ireceived:
[2, 2, 2, 2, 2]
my rank is 2, and Ireceived:
[1 , 1, 1, 1, 1]
my rank is 0, and Ireceived:
[4, 4, 4, 4, 4]
my rank is 1, and Ireceived:
[0, 0, 0, 0, 0]
指定啟動5個mpi程序來執行後面的程式。相當於對指令碼拷貝了5份,每個程序執行一份,互不干擾。在執行的時候程式碼裡面唯一的不同,就是各自的rank也就是ID不一樣。
Get_rank()函式:獲取當前程序rank值
Get_size()函式:獲取總共的程序數
send()函式:將資料送給rank為dest的值的程序
recv()函式:接收rank為source的值的資料
訊息傳遞的同步非同步性:
recv是阻塞函式,也就是說程序要收到傳送方的資料,這個函式才返回.
而send是不確定的,也就是說它有時候是阻塞,有時候是非阻塞.當傳送的資料不多的時候,mpi會將資料存到一個系統緩衝區,然後馬上進行send方法的返回.而當資料量很大超過緩衝區的大小的時候,mpi需要等待接收方接收,然後把資料拷貝給接收方,再進行send方法的返回.
簡單來說,資料量少->非阻塞(同步),資料量大->阻塞(非同步).
除了send和recv方法,還有Send和Recv方法.,這樣區分是由於要傳遞的資料的性質差異.當我們要傳遞int,float,list,dict等python內建型別的資料的時候,我們使用小寫的方法.而當使用buffer型別的資料的時候,我們要使用大寫的方法.
send的多個版本:
事實上,除了大寫小寫的版本,send還有不同的版本,這個不同是基於不同的傳送策略的,而這些版本都有大小寫之分.
bsend:緩衝模式,資料寫入緩衝區,馬上返回,使用者必須確保緩衝區大小足夠
ssend:同步模式,等接收方接收才返回
rsend:就緒模式,傳送時必須確保接收方處於等待接收的狀態,否則產生錯誤
send:標準模式(bsend+ssend),send實際上就是bsend和ssend的結合體.
3.多點傳輸:
#mpimp.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.rank()
comm_size = comm.size()
if comm_rank == 0:
data = [1,2,3]
for i in range(comm_size - 1):
comm.send(data,dest=i+1)
else:
data = comm.recv(source = 0)
print "Process %d receive"%comm_rank,data
執行結果:
mpiexec -n 6 python mpimp.py
Process 1 receive [1,2,3]
Process 2 receive [1,2,3]
Process 3 receive [1,2,3]
Process 4 receive [1,2,3]
Process 5 receive [1,2,3]
此做法漏洞:
在單機上跑這n個程序好像沒所謂,CPU始終在工作,時間複雜度也是O(n)級別.
但假如是n臺機器分別跑這n個程序,第0臺機器始終在傳送資料,而其他機器的大部分時間都在排隊,等第0臺機器往自己傳送資料.這樣的話,這堆機器要執行完這堆程序,需要O(n)時間.等於一臺機器的工作效率,不是滿意的結果。
廣播(改進):
想到了,我們可以像p2p那樣做,有資料的機器都幫忙向沒有資料的機器傳送資料,這樣的話時間複雜度是可以降低到O(logn)的!
mpi有實現這樣操作的介面,bcast函式
改進程式碼:
#mpimp.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.rank()
comm_size = comm.size()
if comm_rank == 0:
data = [1,2,3]
comm.bcast(data, root=0)
else:
data = comm.bcast(None, root=0)
print "Process %d receive"%comm_rank,data
bcast()函式:無論是廣播者,還是被廣播者,都是呼叫bcast函式,而不像點對點那樣一個send另一個recv.bcast()函式一個根程序把資料發給其他程序。
散播:
散播的函式和廣播的引數是一樣的,只是返回值不一樣.
注意!散播的傳送方也會接收到資料(和概念圖有出入),
散播裡列表裡元素的分發不是按程序0就分得第0個元素,程序1就第1個元素這樣的.而是一種類似隨機的打亂的分發策略.
散播發送的資料,data(列表)裡元素的個數必須等於程序的個數.否則會出錯。
示例程式碼:
#mpisca.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = [1,2,3,4,5,6]
else:
data = None
data = comm.scatter(data, root=0)
print "Process %d receive"%comm_rank,data
執行結果:
mpiexec -n 6 python mpisca.py
Process 1 receive 2
Process 4 receive 5
Process 2 receive 3
Process 0 receive 1
Process 3 receive 4
Process 5 receive 6
收集:
散播的逆操作:
#mpigather.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = comm.gather(comm_rank, root=0)
print data
else:
comm.gather(comm_rank,root=0)
mpiexec -n 8 python mpigather.py
[0, 1, 2, 3, 4, 5, 6, 7]
reduce()規約函式:
它相當於在收集的過程中不斷地進行兩元運算,最終在接收方那裡只有一個值,而不是一個列表.
也就是說規約函式
示例程式碼:通過計算圓周率
#mpireduce.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
k = (1.0 if comm_rank%2 == 0 else -1.0)/(2*comm_rank +1)
data = comm.reduce(k, root=0,op=MPI.SUM)
if comm_rank == 0:
pi = data*4
print "PI = %.6f"%pi
執行結果:
C:\Python27\Scripts\ML\MPI>mpiexec -n 12 python mpireduce.py
PI = 3.058403
注意事項:
1.平行計算的reduce,scatter,gather在執行資訊互動函式是並行,資訊互動完之後,每個程序統一從函式中出來,執行接下來的程式碼
2.上述函式root秩代表根節點:scatter傳播,gather接收,reduce最終彙總結果的程序,
3每臺機器reduce複雜度,只有O(logn),reduce函式MPI_SUMj操作:
假設九個程序
1, 2, 3, 4, 5, 6, 7, 8, 9
1+2, 3+4, 5+6, 7+8, 9
1+2+3+4, 5+6+7+8, 9
1+2+3+4+5+6+7+8, 9
1+2+3+4+5+6+7+8+9
4.單機的話不要開幾百個程序,不是開玩笑的
5.注意的是,散播和reduce中傳送接收到的返回值,不是接收方最終得到的返回值,而是一個none.
alltogether:收集後再廣播一次,allreduce:reduce+bcast
barrier是一種全域性同步,就是說全部程序進行同步.
當一個程序呼叫barrier的時候,它會被阻塞.
當所有程序都呼叫了barrier之後,barrier會同時解除所有程序的阻塞.
但執行起來發現並不是這回事.所有程序沒有像期待那樣先全部輸出begin,再全部輸出end,barrier這個函式彷彿形同虛設.
其實這裡問題不是在barrier,而是在print.
我們OS的IO是有緩衝的,一個數據要出現在螢幕上,簡單來說是經過記憶體->標準IO檔案->控制檯螢幕.
而程序間不共享IO檔案(後面會學到如何在MPI的程序裡共享檔案),共享控制檯螢幕.
因此螢幕上語句的順序依賴OS什麼時候將IO檔案裡的內容推到螢幕上.
我們強制讓記憶體->標準IO檔案和標準IO檔案->控制檯螢幕這兩步一起進行,也就是加上flush語句.
form mpi4py import MPI
import sys
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
print comm_rank,'begin'
sys.stdout.flush()
comm.barrier()
print comm_rank,'end'
sendrecv()函式
傳送send+接收recv
data = sendrecv(data,dest=1)
關於程序
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
data_send = [comm_rank]*5
comm.send(data_send,dest=(comm_rank+1)%comm_size)
data_recv = comm.recv(source=(comm_rank-1)%comm_size)
print (my rank is %d, and Ireceived: %comm_rank)
print data_recv
這裡面有個需要注意的問題,如果我們要傳送的資料比較小的話,mpi會快取我們的資料,然後繼續執行後面的指令,而不會等待對方程序執行recv指令接收這個資料。
但是,如果要傳送資料量很大,[rank]*500程式就會很卡,因為所有程序都會卡在傳送這條指令,等待下一個指令發起接收指令,但是程序是執行完傳送的指令才能接收的指令,這就和死鎖差不多。
一般修改如下:
from mpi4py import MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
data_send = [comm_rank]*500
if comm_rank == 0:
comm.send(data_send, dest=(comm_rank+1)%comm_size)
if comm_rank > 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)
comm_send(data_send,dest=(comm_rank-1)%comm_size)
if comm_rank == 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)