多執行緒技術模擬平行計算之二:陣列字首和(Prefix Sum)
一、字首和(Prefix Sum)定義:
給定一個數組A[1..n],字首和陣列PrefixSum[1..n]定義為:PrefixSum[i] = A[0]+A[1]+...+A[i-1];
例如:A[5,6,7,8] --> PrefixSum[5,11,18,26]
PrefixSum[0] =A[0] ;
PrefixSum[1] =A[0] + A[1] ;
PrefixSum[2] =A[0] + A[1] + A[2] ;
PrefixSum[3] =A[0] + A[1] + A[2] + A[3] ;
二、序列程式(Sequential Program):
時間複雜度:O(n);空間複雜度:O(n)。n是陣列A的長度。
虛擬碼:
Procedure PS(A,n,S)
S[0] = A[0];
if n = 1 then exit;
else { for j = 1 to n do S[j] = S[j-1] +A[j] }</span>
序列程式碼sequence.c:
#include<stdio.h> #include<time.h> #include<stdlib.h> #include<unistd.h> #include<memory.h> #define n 1000 #define RANGE 2 int A[n]={0}; intprefix_sum[n]={0}; int num; void work() { int i ; prefix_sum[0]=A[0]; for(i=1;i<num;i++) { prefix_sum[i] = prefix_sum[i-1]+A[i]; } } void generate() { FILE *file; if( (file=fopen("/home/huawei/leaguenew/parallel_computing/A.txt","wt"))==NULL ) { perror("open fail\n"); exit(1); } int i; srand((unsigned)time(NULL)) ; for( i =0 ; i < num ; i++ ) fprintf(file,"%-4d",rand()%RANGE); //fprintf(file,"\n"); fclose(file); } void Read() { FILE *file; if( (file =fopen("/home/huawei/leaguenew/parallel_computing/A.txt","rt"))==NULL ) { perror("open fail\n"); exit(1); } int i; srand((unsigned)time(NULL)); for(i=0;i<num;i++) fscanf(file,"%d",&A[i]); fclose(file); } int main() { printf("please input the scale ofinput : "); scanf("%d",&num); generate(); Read(); int h; clock_t start = clock(); work(); clock_t finish = clock(); printf("the time cost is : %.6fseconds\n",(double)(finish-start)/CLOCKS_PER_SEC); for(h=0;h<num;h++) printf("%d ",A[h]); printf("\nThe prefix sum is :\n"); for(h=0;h<num;h++) printf("%d ",prefix_sum[h]); printf("\n"); return 0; }
程式碼中的檔案路徑需要自行修改。
遞迴實現:
#include<iostream> using namespace std; int ps[10]; int prefix_sum(inta[] ,int n) { if( n == 0 ) { ps[0] = a[0]; return ps[0]; } ps[n] = a[n]+prefix_sum(a,n-1); return ps[n]; } int main() { int a[] = {3,1,2,5,7,4,9}; int len = sizeof(a)/sizeof(a[0]); prefix_sum(a,len-1) ; for(int i=0;i<len;i++) { cout<<ps[i]<<""; } cout<<endl; system("pause"); return 0; }
三、並行演算法
3.1.Non RecursivePrefix_Sum
首先讓A[1..n]來表示n個整數的陣列。讓B[h][j]作為輔助資料在二叉樹的前向遍歷中儲存中間資訊(後面會說明方法),其中1<=h<=log2(n) and 1<=j<=n/2^h,然而陣列C被用作後向遍歷樹下存放結果。
Input:大小為n=2^h的一個數組A,h是一個非負整數
Output:矩陣C存放結果,C[0][j]就是字首和的第j項,1<=j<=n
虛擬碼:
begin
for 1<=j<=n <strong>pardo</strong>
set B[0,j] = A[j];
for h = 1 to log2(n) do
for 1<=j<=n/2^h <strong>pardo</strong>
set B[h,j] = B[h-1,2*j-1] + B[h-1,2*j];
for h = log2(n) to 0 do
for 1<=j<n/2^h <strong>pardo</strong>
{
j is even : set C[h,j] = C[h+1,j/2];
j = 1 : set C[h,1] = B[h,1];
j is odd : set C[h,j] = C[h+1,(j-1)/2] + B[h,j];
}
end
說明:虛擬碼中的pardo是parallel do的意思,即並行處理。Data Flow(Forward)
執行時間:並行的執行時間就是O(log2(n))其實也就是樹的高度,每一層並行一次時間複雜度為O(1)。
注意:這是一個前向遍歷,從底而上。對於C的後向遍歷同樣的,區別在於從上而下。
DataFlow(Backward)
陣列C的生成根據上述虛擬碼生成,最終的C[0][j](1<=j<=n)就是最後的字首和的結果。
上程式碼:
#include<stdio.h>
#include<time.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>
#include<stdlib.h>
#include<math.h>
#define RANGE 99
#define M 1000
void generate(int n);
void Read(int n);
void *func1(void *arg);
void *func2(void *arg);
void *func3(void *arg);
void calculate();
void resultToFile();
int A[M];
int B[20][1000];
int prefix_sum[20][1000]={0};
pthread_t tids[20][1000];
pthread_mutex_t mutex;
int n;
struct S
{
int i ;
int j ;
};
int main()
{
printf("please input the scale of input : ");
scanf("%d",&n);
generate(n);
Read(n);
clock_t start = clock();
calculate();
resultToFile();
clock_t finish = clock();
printf("the time cost is : %.6f seconds\n",(double)(finish-start)/CLOCKS_PER_SEC);
return 0;
}
void calculate()
{
int h,i,j,k,l,flag;
/*======set B[0][j]=A[j] in parallel==================*/
for(i=1;i<=n;i++)
{
struct S *s;
s = (struct S *)malloc(sizeof(struct S));
s->i = i;
if( pthread_create(&tids[0][i],NULL,func1,s) )
{
perror("Fail to create thread!");
exit(1);
}
}
for(j=1;j<=n;j++)
pthread_join(tids[0][j],NULL);
/***Show the B[0][j]***************************/
#if 1
printf("Show the A[i]: ");
for(j=1;j<=n;j++)
printf("%d ",A[j]);
printf("\nShow the B[0][j]:");
for(j=1;j<=n;j++)
printf("%d ",B[0][j]);
printf("\n");
#endif
/***********************************************/
/**Data Flow Forward Algorithm:build the tree***/
for(h=1;h<=log2(n);h++)
{
for(j=1;j<=n/pow(2,h);j++)
{
struct S *s;
s = (struct S*)malloc(sizeof(struct S));
s->i = h;
s->j = j;
if( pthread_create(&tids[h][j],NULL,func2,s) )
{
perror("sth is wrong");
exit(1);
}
}
/*****wait all the threads terminate to continue*****/
for(j=1;j<=n/pow(2,h);j++)
pthread_join(tids[h][j],NULL);
}
#if 1
/******Print Data Flow Forward************/
printf("Data Flow Forward:\n");
for(h=0;h<=log2(n);h++)
{
for(l=1;l<=2*h+1;l++)
printf(" ");
for(j=1;j<=n/pow(2,h);j++)
{
printf("%d",B[h][j]);
for(l=1;l<=1+h;l++)
printf(" ");
}
printf("\n");
}
#endif
/*********************************************/
/***********Data Flow Backward:Traverse*******/
for(h=log2(n);h>=0;h--)
{
for(j=1;j<=n/pow(2,h);j++)
{
struct S *s;
s = (struct S*)malloc(sizeof(struct S));
s->i = h;
s->j = j;
if( pthread_create(&tids[h][j],NULL,func3,s) )
{
perror("sth is wrong");
exit(1);
}
}
for(j=1;j<=n/pow(2,h);j++)
pthread_join(tids[h][j],NULL);
}
#if 1
/********Print Data Flow Backward************/
printf("Data Flow Backward:\n");
for(h=log2(n);h>=0;h--)
{
for(l=1;l<=2*h+1;l++)
printf(" ");
for(j=1;j<=n/pow(2,h);j++)
{
printf("%d ",prefix_sum[h][j]);
for(l=1;l<=h;l++)
printf(" ");
}
printf("\n");
}
#endif
/*********************************************/
}
void resultToFile()
{
int i;
/***********write the result to file**********/
FILE *file ;
file = fopen("/home/leaguenew/parallel_computing/prefixSumResult.txt","wt");
for(i=1;i<=n;i++)
{
fprintf(file,"%-6d",prefix_sum[0][i]);
}
/*********************************************/
}
void *func1(void *arg)//set B[0][j]=A[j]
{
int i;
struct S *p;
p = (struct S*)arg;
i = p->i;
pthread_mutex_lock(&mutex);
B[0][i]=A[i];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
void *func2(void *args)//B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
{
int h , j;
struct S *p;
p = (struct S*)args;
h = p->i;
j = p->j;
pthread_mutex_lock(&mutex);
B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
void *func3(void *arg)//get the prefix sum
{
int h,j;
struct S *p;
p = (struct S*)arg;
h = p->i;
j = p->j;
pthread_mutex_lock(&mutex);
if(j==1) prefix_sum[h][1] = B[h][1];
else if (j%2==0) prefix_sum[h][j] = prefix_sum[h+1][j/2] ;
else prefix_sum[h][j] = prefix_sum[h+1][(j-1)/2] + B[h][j] ;
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
void generate(int n)
{
FILE *file1;
if( (file1=fopen("/home/leaguenew/parallel_computing/arrayA.txt","wt") )==NULL )
{
perror("fopen");
exit(1);
}
int i,j;
srand( (unsigned)time(NULL) );
for( i = 1 ; i <= n ;i++)
fprintf(file1,"%-8d",rand()%RANGE);
fprintf(file1,"\n");
fclose(file1);
}
void Read(int n)
{
FILE *file1;
if( (file1=fopen("/home/leaguenew/parallel_computing/arrayA.txt","rt") )==NULL )
{
perror("fopen");
exit(1);
}
int i,j;
srand( (unsigned)time(NULL) );
for( i = 1 ; i <= n ;i++)
fscanf(file1,"%d",&A[i]);
fclose(file1);
}
程式的流程是:
Main()
{
Generate();//生成隨機資料
Read();//讀取隨機資料到陣列A中
Calculate()
{
Pthread_create(func1);
Pthread_create(func2);
Pthread_create(func3);
}
ResultToFile();//把結果寫入檔案
}
Func1();//計算:B[0][j] = A[j]
Func2();//計算:B[h][j]=B[h-1][2*j-1]+B[h-1][2*j]
Func3();//計算:C[i][j]
描述:首先通過隨機數生成一組隨機數,存放到檔案中去,然後通過讀取檔案中的資料到陣列A中。然後開始計時,呼叫計算函式calculate,建立多執行緒計算:
<span style="font-family:Microsoft YaHei;font-size:14px;">B[0][j] = A[j];
B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
{
if(j==1) prefix_sum[h][1]= B[h][1];
elseif (j%2==0) prefix_sum[h][j] =prefix_sum[h+1][j/2] ;
elseprefix_sum[h][j] = prefix_sum[h+1][(j-1)/2] + B[h][j] ;
}</span>
最後輸出結果到檔案。
結果截圖:
n=2:
n=4:
n=8:
n=16:
n=500:
n=600:
n=800:
3.2.Get rid of the 2-D array
這是計算prefix sum的另外一種演算法,去除二維陣列。
虛擬碼:
in:A[1..n],out:Prefix[1..n]
for 1<=i<=n do in parallel
Prefix[i] = A[i-1] + A[i];
end in parallel
k = 2:
while(k<n) do
for k+1<=i<=n do in parallel
Prefix[i] = Prefix[i-k] + Prefix[i]
end in parallel
k = k + k;
end while
程式碼:
#include<stdio.h>
#include<time.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>
#include<stdlib.h>
#include<math.h>
#define RANGE 99
#define M 1000
void generate();
void Read();
void *func1(void *arg);
void *func2(void *arg);
void calculate();
void resultToFile();
int A[M]={0};
int prefix_sum[M]={0};
int temp[M]={0};
int n;
pthread_t tids[1000][1000];
pthread_mutex_t mutex;
struct S
{
int i ;
int j ;
};
int main()
{
printf("please input the scale of input : ");
scanf("%d",&n);
generate();
Read();
clock_t start = clock();
calculate();
resultToFile();
clock_t finish = clock();
printf("The time cost is : %.6f second\n",(double)(finish-start)/CLOCKS_PER_SEC);
return 0 ;
}
void resultToFile()
{
int i;
/***********write the result to file**************/
FILE *file;
file = fopen("/home/leaguenew/parallel_computing/GetRidOf2DArray.txt","wt");
for(i=1;i<=n;i++)
{
fprintf(file,"%-6d",prefix_sum[i]);
}
/*************************************************/
}
void calculate()
{
int h,i,j,k,l ;
/************************************************/
for(i=1;i<=n;i++)
{
struct S* s;
s = (struct S*)malloc(sizeof(struct S));
s->i = i;
if( pthread_create(&tids[0][i],NULL,func1,s))
{
perror("Fail to create thread!");
exit(1);
}
}
for(j=1;j<=n;j++)
pthread_join(tids[0][j],NULL);
/************************************************/
k=2;
while(k<n)
{
for(i=1;i<=n;i++)
temp[i] = prefix_sum[i];
for(i=k+1;i<=n;i++)
{
struct S* s;
s = (struct S*)malloc(sizeof(struct S));
s->i = i;
s->j = k;
if( pthread_create(&tids[k][i],NULL,func2,s))
{
perror("Fail to create thread!");
exit(1);
}
}
for(j=k+1;j<=n;j++)
pthread_join(tids[k][j],NULL);
for(i=1;i<=n;i++)
prefix_sum[i] = temp[i];
k = k + k;
}
#if 0
/************show the result***************/
printf("A: \n");
for(i=1;i<=n;i++)
printf("%d ",A[i]);
printf("\nprefix_sum:\n");
for(i=1;i<=n;i++)
printf("%d ",prefix_sum[i]);
printf("\n");
/******************************************/
#endif
}
void *func1(void *arg)
{
int i ;
struct S *p;
p = (struct S*)arg;
i = p->i;
pthread_mutex_lock(&mutex);
if(i==1) prefix_sum[i] = A[i];
else prefix_sum[i] = A[i-1] + A[i];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
void *func2(void *arg)
{
int i ,k ;
struct S *p;
p = (struct S*)arg;
i = p->i;
k = p->j;
pthread_mutex_lock(&mutex);
temp[i] = prefix_sum[i-k] + prefix_sum[i];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
void generate()
{
FILE *file;
if( (file = fopen("/home/leaguenew/parallel_computing/arrayB.txt","wt"))==NULL )
{
perror("fopen");
exit(1);
}
int i , j ;
srand((unsigned)time(NULL) );
for( i=1;i<=n;i++ )
fprintf(file,"%-6d",rand()%RANGE);
fprintf(file , "\n");
fclose(file);
}
void Read()
{
FILE *file;
if( (file = fopen("/home/leaguenew/parallel_computing/arrayB.txt","rt"))==NULL )
{
perror("fopen");
exit(1);
}
int i , j ;
srand((unsigned)time(NULL) );
for( i=1;i<=n;i++ )
fscanf(file,"%d",&A[i]);
fclose(file);
}
程式的流程是:
Main()
{
Generate();//生成隨機資料寫入檔案
Read();//讀取檔案中的資料到陣列中
Calculate()
{
Pthread_create(func1);
Pthread_create(func2);
}
ResultToFile();//把結果寫入檔案
}
Func1();//計算:prefix_sum[i] = A[i-1] + A[i];
Func2();//計算:prefix[i] = prefix_sum[i-k] + prefix_sum[i];<strong>
結果截圖:
n=4:
n=6:
n=16:
四、總結
a.pthread的連結庫問題:編譯的時候要新增–lpthread。
b.Scanf引數沒有新增&造成段錯誤。
c.通過列印除錯除錯程式,及列印相關的輸出語句來判斷哪個位置發生錯誤。
d.sleep()不佔計算clock()時間。
f.在UNIX和LINUX系統中在要命令中加入-lm這個功能是把數學庫與源程式連結在一起。
g.Pthread_join的理解,等待所有建立的執行緒結束後繼續進行。
h.通過動態建立結構體來給pthread_create()傳遞多個引數。