c++使用mpich庫編寫並行程式
1、問題描述
矩陣乘法問題描述如下:
給定矩陣A和B,其中A是m*p大小矩陣,B是p*n大小的矩陣。求C = A*B。
求解這個問題最簡單的演算法是遍歷A的行和B的列,求得C的相應元素,時間複雜度O(mnp),空間複雜度O(1)。
// 矩陣乘法的C++實現 for(int i=0; i<m; i++){ for(int j=0; j<n; j++){ float temp = 0.0; for(int k=0; k<p; k++){ temp += A[i*p + k] * B[k*n + j]; } C[i*n + j] = temp; } }
2、最簡單的並行方案
要改進上述演算法為並行演算法,需要了解到C++ MPI程式設計的特點:
a. 各個程序之間不能有依賴。這是因為各個程序可以以任意的時間順序執行。
b. 資料是分散式儲存的。也就是說,每個程序有自己獨立的資料備份。
有了這兩點認識後,一種最簡單的並行方案就出來了:(假設開啟np個程序)
(1). 首先將矩陣A和C按行分為np塊;
(2). 程序號為 id 的程序讀取A的第 id 個分塊和B;
(3). 程序號為 id 的程序求解相應的C的第 id 個分塊。
程式碼如下:
/* filename: matMultiplyWithMPI.cpp * parallel matrix multiplication with MPI * C(m,n) = A(m,p) * B(p,n) * input: three parameters - m, p, n * @copyright: fengfu-chris*/ #include<iostream> #include<mpi.h> #include<math.h> #include<stdlib.h> void initMatrixWithRV(float *A, int rows, int cols); void matMultiplyWithSingleThread(float *A, float *B, float *matResult, int m, int p, int n); int main(int argc, char** argv) { int m = atoi(argv[1]); int p = atoi(argv[2]); int n = atoi(argv[3]); float *A, *B, *C; float *bA, *bC; int myrank, numprocs; MPI_Status status; MPI_Init(&argc, &argv); // 並行開始 MPI_Comm_size(MPI_COMM_WORLD, &numprocs); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int bm = m / numprocs; bA = new float[bm * p]; B = new float[p * n]; bC = new float[bm * n]; if(myrank == 0){ A = new float[m * p]; C = new float[m * n]; initMatrixWithRV(A, m, p); initMatrixWithRV(B, p, n); } MPI_Barrier(MPI_COMM_WORLD); /* step 1: 資料分配 */ MPI_Scatter(A, bm * p, MPI_FLOAT, bA, bm *p, MPI_FLOAT, 0, MPI_COMM_WORLD); MPI_Bcast(B, p * n, MPI_FLOAT, 0, MPI_COMM_WORLD); /* step 2: 平行計算C的各個分塊 */ matMultiplyWithSingleThread(bA, B, bC, bm, p, n); MPI_Barrier(MPI_COMM_WORLD); /* step 3: 彙總結果 */ MPI_Gather(bC, bm * n, MPI_FLOAT, C, bm * n, MPI_FLOAT, 0, MPI_COMM_WORLD); /* step 3-1: 解決歷史遺留問題(多餘的分塊) */ int remainRowsStartId = bm * numprocs; if(myrank == 0 && remainRowsStartId < m){ int remainRows = m - remainRowsStartId; matMultiplyWithSingleThread(A + remainRowsStartId * p, B, C + remainRowsStartId * n, remainRows, p, n); } delete[] bA; delete[] B; delete[] bC; if(myrank == 0){ delete[] A; delete[] C; } MPI_Finalize(); // 並行結束 return 0; } void initMatrixWithRV(float *A, int rows, int cols) { srand((unsigned)time(NULL)); for(int i = 0; i < rows*cols; i++){ A[i] = (float)rand() / RAND_MAX; } } void matMultiplyWithSingleThread(float *A, float *B, float *matResult, int m, int p, int n) { for(int i=0; i<m; i++){ for(int j=0; j<n; j++){ float temp = 0; for(int k=0; k<p; k++){ temp += A[i*p+k] * B[k*n + j]; } matResult[i*n+j] = temp; } } }
編譯:
$mpigxx matMultiplyWithMPI.cpp -o matMultiplyWithMPI
執行:
$mpirun -np 8 matMultiplyWithMPI 3000 2000 4000
這裡假設m = 3000, p = 2000, n = 4000。另外,開啟的程序數為8個。 np的個數可以大於CPU的個數。
一般來講,只有當矩陣大小大於5000的量級時,開啟幾十上百個程序的威力才能凸顯出來。尤其是當矩陣量級達到萬維以上時,序列或是少數幾個程序並行的矩陣乘法將變得特別耗時。
3、改進的並行方案:記憶體考慮
上面的並行方案有個很大的缺陷,那就是 B 的備份數和開啟的程序數一致。這對於記憶體不是很充裕或矩陣很大的時候,會導致災難!例如,假設 B 是10000*10000維的,用double型別儲存大概佔700M左右的記憶體。當開啟的程序數達到128個時,單是 B 的備份佔據的記憶體開銷將達到 128 * 700 M = 90G。 這將耗掉巨大的記憶體!
有什麼改進的方案呢?
必須瞭解MPI的第三個特點:
c. 程序之間可以很方便地通訊,並且支援多種通訊方案。
這樣,就可以把 B 也同時分散式的儲存到各個程序對應的記憶體中,然後利用程序之間的通訊來輪換各個 B 的分塊,從而達到減小記憶體開銷的效果。當然,幾乎和所有的程式一樣,離不開時間與空間的trade-off。所以,這種方法雖然節省了記憶體,卻要消耗大量的時間在程序之間的通訊上。
下面給出改進的並行方案:
(1). 將A和C按行分為np塊,將B按列分為np塊(B可以按列儲存);
(2). 程序號為 id 的程序讀取 A 和 B 的第id個分塊;
(3). 迴圈np次:
<1>. 各個程序用各自的A、B分塊求解C的分塊;
<2>. 輪換B的分塊(例如:id 號程序傳送自己當前的B的分塊到 id+1號程序)
程式碼如下:
/* filename: matMultiplyWithMPI_updated.cpp * parallel matrix multiplication with MPI: updated * C(m,n) = A(m,p) * B(p,n) * input: three parameters - m, p, n * @copyright: fengfu-chris */ #include<iostream> #include<mpi.h> #include<math.h> #include<stdlib.h> void initMatrixWithRV(float *A, int rows, int cols); void copyMatrix(float *A, float *A_copy, int rows, int cols); // A: m*p, B: p*n !!! note that B is stored by column first void matMultiplyWithTransposedB(float *A, float *B, float *matResult, int m, int n, int p); int main(int argc, char** argv) { int m = atoi(argv[1]); int n = atoi(argv[2]); int p = atoi(argv[3]); float *A, *B, *C; float *bA, *bB_send, *bB_recv, *bC, *bC_send; int myrank, numprocs; MPI_Status status; MPI_Init(&argc, &argv); // 並行開始 MPI_Comm_size(MPI_COMM_WORLD, &numprocs); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int bm = m / numprocs; int bn = n / numprocs; bA = new float[bm * p]; bB_send = new float[bn * p]; bB_recv = new float[bn * p]; bC = new float[bm * bn]; bC_send = new float[bm * n]; if(myrank == 0){ A = new float[m * p]; B = new float[n * p]; C = new float[m * n]; initMatrixWithRV(A, m, p); initMatrixWithRV(B, n, p); } MPI_Barrier(MPI_COMM_WORLD); MPI_Scatter(A, bm * p, MPI_FLOAT, bA, bm * p, MPI_FLOAT, 0, MPI_COMM_WORLD); MPI_Scatter(B, bn * p, MPI_FLOAT, bB_recv, bn * p, MPI_FLOAT, 0, MPI_COMM_WORLD); int sendTo = (myrank + 1) % numprocs; int recvFrom = (myrank - 1 + numprocs) % numprocs; int circle = 0; do{ matMultiplyWithTransposedB(bA, bB_recv, bC, bm, bn, p); int blocks_col = (myrank - circle + numprocs) % numprocs; for(int i=0; i<bm; i++){ for(int j=0; j<bn; j++){ bC_send[i*n + blocks_col*bn + j] = bC[i*bn + j]; } } if(myrank % 2 == 0){ copyMatrix(bB_recv, bB_send, bn, p); MPI_Ssend(bB_send, bn*p, MPI_FLOAT, sendTo, circle, MPI_COMM_WORLD); MPI_Recv(bB_recv, bn*p, MPI_FLOAT, recvFrom, circle, MPI_COMM_WORLD, &status); }else{ MPI_Recv(bB_recv, bn*p, MPI_FLOAT, recvFrom, circle, MPI_COMM_WORLD, &status); MPI_Ssend(bB_send, bn*p, MPI_FLOAT, sendTo, circle, MPI_COMM_WORLD); copyMatrix(bB_recv, bB_send, bn, p); } circle++; }while(circle < numprocs); MPI_Barrier(MPI_COMM_WORLD); MPI_Gather(bC_send, bm * n, MPI_FLOAT, C, bm * n, MPI_FLOAT, 0, MPI_COMM_WORLD); if(myrank == 0){ int remainAStartId = bm * numprocs; int remainBStartId = bn * numprocs; for(int i=remainAStartId; i<m; i++){ for(int j=0; j<n; j++){ float temp=0; for(int k=0; k<p; k++){ temp += A[i*p + k] * B[j*p +k]; } C[i*p + j] = temp; } } for(int i=0; i<remainAStartId; i++){ for(int j=remainBStartId; j<n; j++){ float temp = 0; for(int k=0; k<p; k++){ temp += A[i*p + k] * B[j*p +k]; } C[i*p + j] = temp; } } } delete[] bA; delete[] bB_send; delete[] bB_recv; delete[] bC; delete[] bC_send; if(myrank == 0){ delete[] A; delete[] B; delete[] C; } MPI_Finalize(); // 並行結束 return 0; } void initMatrixWithRV(float *A, int rows, int cols) { srand((unsigned)time(NULL)); for(int i = 0; i < rows*cols; i++){ A[i] = (float)rand() / RAND_MAX; } } void copyMatrix(float *A, float *A_copy, int rows, int cols) { for(int i=0; i<rows*cols; i++){ A_copy[i] = A[i]; } } void matMultiplyWithTransposedB(float *A, float *B, float *matResult, int m, int p, int n) { for(int i=0; i<m; i++){ for(int j=0; j<n; j++){ float temp = 0; for(int k=0; k<p; k++){ temp += A[i*p+k] * B[j*p+k]; } matResult[i*n+j] = temp; } } }
這裡最需要注意的地方就是B的輪換。 有兩點需要注意:
(1) 防阻塞機制。這裡採用奇偶原則:偶數號程序先發送,再接收;奇數號程序則相反。這樣可以避免所有程序同時傳送造成死鎖的情況;
(2) 資料備份。傳送和接收的資訊儲存在不同的矩陣中,這樣保證原來的資訊不會被覆蓋。
這種方法的優點是顯而易見的。對於足夠牛的伺服器/計算機叢集,開啟成百上千個程序來並行完全不是問題。
並行不易,且行且珍惜!