1. 程式人生 > >Python中MPI訊息傳遞介面

Python中MPI訊息傳遞介面

MPI(Message-Passing-Interface)訊息傳遞介面

1.MPI安裝

python mpi安裝mpi4py的python庫

pip install mpi4py

安裝後得到Bin資料夾:

這裡寫圖片描述

將MicrosoftMPI/Bin資料夾路徑新增到環境變數:

這裡寫圖片描述

命令列輸入mpiexec執行,出現幫助表示安裝成功

這裡寫圖片描述
2.試驗程式碼

from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

print
'I'm the %d process of %d processes" % (comm_rank, comm_size)

這裡寫圖片描述

2.點對點傳輸

點對點通訊.其實就是最簡單的程序A向程序B傳送資訊,而程序B向程序A接收資訊.這是關於兩個程序之間的通訊.

示例程式碼:

#mpip2p.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

data = [comm_rank]*5
comm.send(data,dest=(comm_rank+1
)%comm_size) data_recv =comm.recv(source=(comm_rank-1)%comm_size) print "my rank is %d, and Ireceived:" %comm_rank print data_recv

在命令列中輸入命令

mpiexec -n 5 python mpip2p.py

執行結果:

my rank is 4, and Ireceived:
[3, 3, 3, 3, 3]
my rank is 3, and Ireceived:
[2, 2, 2, 2, 2]
my rank is 2, and Ireceived:
[1
, 1, 1, 1, 1] my rank is 0, and Ireceived: [4, 4, 4, 4, 4] my rank is 1, and Ireceived: [0, 0, 0, 0, 0]

指定啟動5個mpi程序來執行後面的程式。相當於對指令碼拷貝了5份,每個程序執行一份,互不干擾。在執行的時候程式碼裡面唯一的不同,就是各自的rank也就是ID不一樣。

Get_rank()函式:獲取當前程序rank值

Get_size()函式:獲取總共的程序數

send()函式:將資料送給rank為dest的值的程序

recv()函式:接收rank為source的值的資料

訊息傳遞的同步非同步性:

recv是阻塞函式,也就是說程序要收到傳送方的資料,這個函式才返回.

而send是不確定的,也就是說它有時候是阻塞,有時候是非阻塞.當傳送的資料不多的時候,mpi會將資料存到一個系統緩衝區,然後馬上進行send方法的返回.而當資料量很大超過緩衝區的大小的時候,mpi需要等待接收方接收,然後把資料拷貝給接收方,再進行send方法的返回.

簡單來說,資料量少->非阻塞(同步),資料量大->阻塞(非同步).

除了send和recv方法,還有Send和Recv方法.,這樣區分是由於要傳遞的資料的性質差異.當我們要傳遞int,float,list,dict等python內建型別的資料的時候,我們使用小寫的方法.而當使用buffer型別的資料的時候,我們要使用大寫的方法.

send的多個版本:

事實上,除了大寫小寫的版本,send還有不同的版本,這個不同是基於不同的傳送策略的,而這些版本都有大小寫之分.

bsend:緩衝模式,資料寫入緩衝區,馬上返回,使用者必須確保緩衝區大小足夠

ssend:同步模式,等接收方接收才返回

rsend:就緒模式,傳送時必須確保接收方處於等待接收的狀態,否則產生錯誤

send:標準模式(bsend+ssend),send實際上就是bsend和ssend的結合體.

3.多點傳輸:

#mpimp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.rank()
comm_size = comm.size()

if comm_rank == 0:
    data = [1,2,3]
    for i in range(comm_size - 1):
        comm.send(data,dest=i+1)
else:
    data = comm.recv(source = 0)
    print "Process %d receive"%comm_rank,data 

執行結果:

mpiexec -n 6 python mpimp.py
Process 1 receive [1,2,3]
Process 2 receive [1,2,3]
Process 3 receive [1,2,3]
Process 4 receive [1,2,3]
Process 5 receive [1,2,3]

此做法漏洞:

在單機上跑這n個程序好像沒所謂,CPU始終在工作,時間複雜度也是O(n)級別.

但假如是n臺機器分別跑這n個程序,第0臺機器始終在傳送資料,而其他機器的大部分時間都在排隊,等第0臺機器往自己傳送資料.這樣的話,這堆機器要執行完這堆程序,需要O(n)時間.等於一臺機器的工作效率,不是滿意的結果。

廣播(改進):

想到了,我們可以像p2p那樣做,有資料的機器都幫忙向沒有資料的機器傳送資料,這樣的話時間複雜度是可以降低到O(logn)的!

mpi有實現這樣操作的介面,bcast函式

改進程式碼:

#mpimp.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.rank()
comm_size = comm.size()

if comm_rank == 0:
    data = [1,2,3]
    comm.bcast(data, root=0)
else:
    data = comm.bcast(None, root=0)
    print "Process %d receive"%comm_rank,data 

bcast()函式:無論是廣播者,還是被廣播者,都是呼叫bcast函式,而不像點對點那樣一個send另一個recv.bcast()函式一個根程序把資料發給其他程序。

散播:

這裡寫圖片描述

散播的函式和廣播的引數是一樣的,只是返回值不一樣.

注意!散播的傳送方也會接收到資料(和概念圖有出入),

散播裡列表裡元素的分發不是按程序0就分得第0個元素,程序1就第1個元素這樣的.而是一種類似隨機的打亂的分發策略.

散播發送的資料,data(列表)裡元素的個數必須等於程序的個數.否則會出錯。

示例程式碼:

#mpisca.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
    data = [1,2,3,4,5,6]
else:
    data = None

data = comm.scatter(data, root=0)
    print "Process %d receive"%comm_rank,data 

執行結果:

mpiexec -n 6 python mpisca.py
Process 1 receive 2
Process 4 receive 5
Process 2 receive 3
Process 0 receive 1
Process 3 receive 4
Process 5 receive 6

收集:

散播的逆操作:

#mpigather.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
    data = comm.gather(comm_rank, root=0)
    print data
else:
    comm.gather(comm_rank,root=0)
mpiexec -n 8 python mpigather.py
[0, 1, 2, 3, 4, 5, 6, 7]

reduce()規約函式:

它相當於在收集的過程中不斷地進行兩元運算,最終在接收方那裡只有一個值,而不是一個列表.

也就是說規約函式

示例程式碼:通過113+1517+...=π4計算圓周率

#mpireduce.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

k = (1.0 if comm_rank%2 == 0 else -1.0)/(2*comm_rank +1)
data = comm.reduce(k, root=0,op=MPI.SUM)

if comm_rank == 0:
    pi = data*4
    print "PI = %.6f"%pi

執行結果:

C:\Python27\Scripts\ML\MPI>mpiexec -n 12 python mpireduce.py
PI = 3.058403

注意事項:

1.平行計算的reduce,scatter,gather在執行資訊互動函式是並行,資訊互動完之後,每個程序統一從函式中出來,執行接下來的程式碼

2.上述函式root秩代表根節點:scatter傳播,gather接收,reduce最終彙總結果的程序,

3每臺機器reduce複雜度,只有O(logn),reduce函式MPI_SUMj操作:

假設九個程序

1, 2, 3, 4, 5, 6, 7, 8, 9
  1+2, 3+4, 5+6, 7+8, 9
   1+2+3+4, 5+6+7+8, 9
    1+2+3+4+5+6+7+8, 9
    1+2+3+4+5+6+7+8+9

4.單機的話不要開幾百個程序,不是開玩笑的

5.注意的是,散播和reduce中傳送接收到的返回值,不是接收方最終得到的返回值,而是一個none.

alltogether:收集後再廣播一次,allreduce:reduce+bcast

barrier是一種全域性同步,就是說全部程序進行同步.

當一個程序呼叫barrier的時候,它會被阻塞.

當所有程序都呼叫了barrier之後,barrier會同時解除所有程序的阻塞.

但執行起來發現並不是這回事.所有程序沒有像期待那樣先全部輸出begin,再全部輸出end,barrier這個函式彷彿形同虛設.

其實這裡問題不是在barrier,而是在print.

我們OS的IO是有緩衝的,一個數據要出現在螢幕上,簡單來說是經過記憶體->標準IO檔案->控制檯螢幕.

而程序間不共享IO檔案(後面會學到如何在MPI的程序裡共享檔案),共享控制檯螢幕.

因此螢幕上語句的順序依賴OS什麼時候將IO檔案裡的內容推到螢幕上.

我們強制讓記憶體->標準IO檔案和標準IO檔案->控制檯螢幕這兩步一起進行,也就是加上flush語句.

form mpi4py import MPI
import sys

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

print comm_rank,'begin'
sys.stdout.flush()
comm.barrier()
print comm_rank,'end'

sendrecv()函式

傳送send+接收recv

data = sendrecv(data,dest=1)

關於程序

from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

data_send = [comm_rank]*5
comm.send(data_send,dest=(comm_rank+1)%comm_size)
data_recv = comm.recv(source=(comm_rank-1)%comm_size)
print (my rank is %d, and Ireceived: %comm_rank)
print data_recv

這裡面有個需要注意的問題,如果我們要傳送的資料比較小的話,mpi會快取我們的資料,然後繼續執行後面的指令,而不會等待對方程序執行recv指令接收這個資料。

但是,如果要傳送資料量很大,[rank]*500程式就會很卡,因為所有程序都會卡在傳送這條指令,等待下一個指令發起接收指令,但是程序是執行完傳送的指令才能接收的指令,這就和死鎖差不多。

一般修改如下:

from mpi4py import MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

data_send = [comm_rank]*500
if comm_rank == 0:
    comm.send(data_send, dest=(comm_rank+1)%comm_size)
if comm_rank > 0:
    data_recv = comm.recv(source=(comm_rank-1)%comm_size)
    comm_send(data_send,dest=(comm_rank-1)%comm_size)
if comm_rank == 0:
    data_recv = comm.recv(source=(comm_rank-1)%comm_size)

這也就是為什麼接收放在前面的原因了