排序演算法及並行分析
最近學了高效能運算這門課程,老師讓用OpenMP、MPI或mapReduce寫個大作業。我之前剛好在寫排序,於是我就將常用的排序寫了一遍並且用OpenMP進行並行,計算加速比等資料進行分析。在這篇文章中我主要介紹八大基本排序的實現原理及程式碼,以及對這些演算法進行改進從而讓它們可以並行,並且對他們的效能進行了比較。首先跟大家分享一下我的心得體會,所謂排序演算法,就是通過調整元素的位置達到想要的結果。我們需要明確這個排序演算法的概念,也就是排序的思想。明白這個之後,在進行演算法設計是,我是這樣思考的:這個排序演算法有多少趟排序(這裡指的是大的排序),也就是最外層的for迴圈有多少次;然後每一趟是從哪裡開始?怎麼進行元素的交換?有了這個思路之後,排序演算法就好寫多了,當然我是這樣認為的,每個人都有不同的思維方式。對於像歸併、快排這樣用遞迴實現起來較為方便的,我認為把出棧、入棧的順序理清,理解起來就容易多了。其實,只有自己動手實踐了,發現了問題並解決了或者說雖然沒有解決,但是可以問老師、同學,並且確實對演算法有了更深的理解,這無疑是很有意義的。這裡我都是先列舉排序演算法的大致思路,然後直接貼上程式碼,程式碼幾乎沒註釋,一是演算法完全可以看懂,而是希望大家在有了自己的思路基礎上再去看程式碼或動手寫程式碼,你會發現自己對演算法的思路更加明晰了。這裡我用到了openmp進行並行,如果只是想看排序演算法的,可以直接把並行部分忽略。
</pre><pre name="code" class="cpp">首先是一些標頭檔案和巨集定義,如下:
#include "stdlib.h" #include "iostream" #include "omp.h" #include "time.h" #include "vector" #include "stack" using namespace std; //#define random(x) (rand()%x) #define BOUNDARY 1000000000 //定義隨機數產生的區間 #define MAX_NUM 1000000 //隨機陣列的元素個數 const double MinProb = 1.0 / (RAND_MAX + 1); //概率 typedef int KeyInt; //定義一個記錄待排序的區間[low,high] typedef struct Region { int low; int high; }Region;
下面是我寫的一些函式:
KeyInt* randomCreate(int N); bool happened(double probability); int myrandom(int n);//產生0~n-1之間的等概率隨機數 void DisPlay(int N, KeyInt *p); KeyInt* BubbleAlgorithm(int N, KeyInt *p);//氣泡排序 KeyInt* BubbleAlgorithmParallel(int N, KeyInt *p);//奇偶排序 KeyInt* InsertSort(int N, KeyInt *p);//插入排序 KeyInt* InsertSort(int *p, int low, int high);//指定區間插入排序 vector<KeyInt> InsertSortPart(int N, KeyInt *p);//分割槽間插入排序 vector<KeyInt> InsertSortParallel(int N, KeyInt *p);//插入排序並行 vector<KeyInt> InsertVector(vector<KeyInt> &vec, int value); vector<KeyInt> InsertVectorSort(vector<KeyInt> &vec); KeyInt* ShellSort(int N, KeyInt *p);//希爾排序 KeyInt* ShellSortParallel(int N, KeyInt *p);//希爾排序並行 KeyInt* InsertSort(int N, KeyInt *p, int start, int inc);//指定起始點和步長進行插入排序 void MergeSort(KeyInt *p, KeyInt *temp, int l, int r);//歸併排序 void MergeSort(KeyInt *p, int N);//非遞迴歸併排序 void MergeSortParallel(KeyInt *p, KeyInt *temp, int l, int r);//2核歸併排序 void MergeSortParallel(KeyInt *p, KeyInt *temp, int N);//4核歸併排序 void MergeSortParallel(KeyInt *p, int N);//並行非遞迴歸併排序 void Merge(KeyInt *p, KeyInt *temp, int l, int r);//歸併 void QuickSort(KeyInt *p, int low, int high);//快排 void QuickSortAverage(KeyInt *p, int low, int high);//快排+三數取中+插入 void QuickSortSame(KeyInt *p, int low, int high);//快排+三數取中+插入+聚集相等元素 int SelectPivotMedianOfThree(int *arr, int low, int high);//三數取中 int Partition(int * a, int low, int high);//分隔 void NonRecursiveQuickSort(int *a, int len);//用棧實現快排 void QuickSortParallel(KeyInt *p, int low, int high);//2核快排 void QuickSortParallel4Core(KeyInt *p, int low, int high);//4核快排
KeyInt* BubbleAlgorithm(int N, KeyInt *p) //氣泡排序
{
int i, j;
KeyInt temp;
//#pragma omp parallel for
for (i = 0; i<N-1; i++)
for (j = 0; j<N-1-i; j++)
if (p[j]>p[j+1])
{
temp = p[j];
p[j] = p[j+1];
p[j+1] = temp;
}
return (p);
}
氣泡排序比較簡單,每趟排序從前往後依次比較相鄰的兩個元素,使小的在前,大的在後,經過n-1次排序即可完成。我們可以看出for迴圈執行了n*(n-1)/2次,所以時間複雜度是o(n^2)。另外,氣泡排序是不可以直接進行並行的,因為前面排序的結果會對後面的排序產生影響,所以我們需要對它進行改進。下面,我介紹冒泡的並行版本,奇偶排序(Odd-even Sort)。
KeyInt* BubbleAlgorithmParallel(int N, KeyInt *p) //奇偶排序Odd-even Sort
{
int i, j;
for (i = 1; i < N; i++) {
if ((i&0x1) == 1) {
#pragma omp parallel for
for (j = 0; j < N - 1; j += 2) {
if (p[j] > p[j + 1]) {
int temp = p[j];
p[j] = p[j + 1];
p[j + 1] = temp;
}
}
}
else {
#pragma omp parallel for
for (j = 2; j < N; j += 2) {
if (p[j-1] > p[j]) {
int temp = p[j-1];
p[j-1] = p[j];
p[j] = temp;
}
}
}
}
return (p);
}
奇偶排序是氣泡排序的並行化版本,其主要思想是奇數次排序比較奇數位和它後面一位的大小,偶數次排序比較奇數位和其前面一位的大小。這裡的#pragma omp parallel for 是openmp的並行語句,表示緊跟其後的for迴圈開多個執行緒並行。如果只是
看排序演算法,可以自動忽略。
上圖為odd-even sort的基本方法。
奇數步中, array中奇數項array[i]與右邊的item(array[i + 1])比較;
偶數步中, array中奇數項array[i]與左邊的item(array[i - 1]) 比較;
奇偶排序在實際中用來並行並沒有意義,因為每次迴圈都需要進行執行緒的建立和銷燬,你會發現這大大影響了演算法的效率,甚至開了並行後更慢了。氣泡排序只是針對小資料量的排序,比如元素個數小於一萬的陣列,所以奇偶排序用來並行並沒有實際意義,僅有學習價值。
KeyInt* InsertSort(int N, KeyInt *p)//插入排序
{
int temp;
for (int i = 1; i < N; i++) {
for (int j = i; (j > 0) && (p[j] < p[j - 1]); j--) {
temp = p[j];
p[j] = p[j - 1];
p[j - 1] = temp;
}
}
return p;
}
插入排序基本思想
將n個元素的數列分為已有序和無序兩個部分,如插入排序過程示例下所示:
{{a1},{a2,a3,a4,…,an}}
{{a1⑴,a2⑴},{a3⑴,a4⑴ …,an⑴}}
{{a1(n-1),a2(n-1) ,…},{an(n-1)}}
每次處理就是將無序數列的第一個元素與有序數列的元素從後往前逐個進行比較,
找出插入位置,將該元素插入到有序數列的合適位置中。
KeyInt* InsertSort(int *p, int low, int high)//指定區間插入排序,即對陣列p的指定位置進行插入排序
{
int temp;
for (int i = low+1; i <= high; i++) {
for (int j = i; (j > low) && (p[j] < p[j - 1]); j--) {
temp = p[j];
p[j] = p[j - 1];
p[j - 1] = temp;
}
}
return p;
}
vector<KeyInt> InsertVector(vector<KeyInt> &vec, int value) { //vector型別插入排序
if (vec.size() == 0) {
vec.push_back(value);
return vec;
}
vec.push_back(value);
//int temp;
for (int j = vec.size()-1; j > 0; j--) {
if (vec[j] < vec[j - 1]) {
/*temp = vec[j];
vec[j] = vec[j - 1];
vec[j - 1] = temp;*/
swap(vec[j-1], vec[j]);
}
else if (vec[j] >= vec[j - 1])
break;
}
return vec;
}
vector<KeyInt> InsertVectorSort(vector<KeyInt> &vec)
{
//int temp;
for (int i = 1; i < vec.size(); i++) {
for (int j = i; j > 0; j--) {
if (vec[j] < vec[j - 1]) {
/*temp = vec[j];
vec[j] = vec[j - 1];
vec[j - 1] = temp;*/
swap(vec[j - 1], vec[j]);
}
else if (vec[j] >= vec[j - 1])
break;
}
}
return vec;
}
vector<KeyInt> InsertSortPart(int N, KeyInt *p)//分割槽間插入排序
{
int i;
int interval = BOUNDARY / 4;
vector<int> vec[4];
for (i = 0; i < N; i++) {
if (p[i] < interval)
vec[0].push_back(p[i]);
else if (p[i] < 2 * interval)
vec[1].push_back(p[i]);
else if (p[i] < 3 * interval)
vec[2].push_back(p[i]);
else
vec[3].push_back(p[i]);
}
int* arr0 = new int[vec[0].size()];
int* arr1 = new int[vec[1].size()];
int* arr2 = new int[vec[2].size()];
int* arr3 = new int[vec[3].size()];
for (i = 0; i < vec[0].size(); i++)
arr0[i] = vec[0][i];
for (i = 0; i < vec[1].size(); i++)
arr1[i] = vec[1][i];
for (i = 0; i < vec[2].size(); i++)
arr2[i] = vec[2][i];
for (i = 0; i < vec[3].size(); i++)
arr3[i] = vec[3][i];
arr0 = InsertSort(vec[0].size(), arr0);
arr1 = InsertSort(vec[1].size(), arr1);
arr2 = InsertSort(vec[2].size(), arr2);
arr3 = InsertSort(vec[3].size(), arr3);
vector<int> vec1[4];
for (i = 0; i < vec[0].size(); i++)
vec1[0].push_back(arr0[i]);
for (i = 0; i < vec[1].size(); i++)
vec1[1].push_back(arr1[i]);
for (i = 0; i < vec[2].size(); i++)
vec1[2].push_back(arr2[i]);
for (i = 0; i < vec[3].size(); i++)
vec1[3].push_back(arr3[i]);
vec1[0].insert(vec1[0].end(), vec1[1].begin(), vec1[1].end());
vec1[0].insert(vec1[0].end(), vec1[2].begin(), vec1[2].end());
vec1[0].insert(vec1[0].end(), vec1[3].begin(), vec1[3].end());
return vec1[0];
}
vector<KeyInt> InsertSortParallel(int N, KeyInt *p)//插入排序並行
{
int i;
int interval = BOUNDARY / 4;
vector<int> vec[4];
//vec[0].reserve(MAX_NUM);
//vec[1].reserve(MAX_NUM/2);
//vec[2].reserve(MAX_NUM/2);
//vec[3].reserve(MAX_NUM/2);
//long start = clock();
for (i = 0; i < N; i++) {
if (p[i] < interval)
vec[0].push_back(p[i]);
else if (p[i] < 2 * interval)
vec[1].push_back(p[i]);
else if (p[i] < 3 * interval)
vec[2].push_back(p[i]);
else
vec[3].push_back(p[i]);
}
//long end = clock();
//printf("The time1 is:%lf\n", (double)(end - start));
//printf("%d %d %d %d\n", vec[0].size(), vec[1].size(), vec[2].size(), vec[3].size);
//cout << vec[0].size() << '\n';
//cout << vec[1].size() << '\n';
//cout << vec[2].size() << '\n';
//cout << vec[3].size() << '\n';
//long start1 = clock();
int* arr0 = new int[vec[0].size()];
int* arr1 = new int[vec[1].size()];
int* arr2 = new int[vec[2].size()];
int* arr3 = new int[vec[3].size()];
for (i = 0; i < vec[0].size(); i++)
arr0[i] = vec[0][i];
for (i = 0; i < vec[1].size(); i++)
arr1[i] = vec[1][i];
for (i = 0; i < vec[2].size(); i++)
arr2[i] = vec[2][i];
for (i = 0; i < vec[3].size(); i++)
arr3[i] = vec[3][i];
omp_set_num_threads(4);
#pragma omp parallel
{
#pragma omp sections
{
#pragma omp section
{
//InsertVectorSort(vec[0]);
arr0 = InsertSort(vec[0].size(), arr0);
//printf("%d\n", omp_get_thread_num());
}
#pragma omp section
{
//InsertVectorSort(vec[1]);
arr1 = InsertSort(vec[1].size(), arr1);
//printf("%d\n", omp_get_thread_num());
}
#pragma omp section
{
//InsertVectorSort(vec[2]);
arr2 = InsertSort(vec[2].size(), arr2);
//printf("%d\n", omp_get_thread_num());
}
#pragma omp section
{
//InsertVectorSort(vec[3]);
arr3 = InsertSort(vec[3].size(), arr3);
//printf("%d\n", omp_get_thread_num());
}
}
}
/*InsertVectorSort(vec[0]);
InsertVectorSort(vec[1]);
InsertVectorSort(vec[2]);
InsertVectorSort(vec[3]);*/
/*arr0 = InsertSort(vec[0].size(), arr0);
arr1 = InsertSort(vec[1].size(), arr1);
arr2 = InsertSort(vec[2].size(), arr2);
arr3 = InsertSort(vec[3].size(), arr3);*/
//long end1 = clock();
//printf("The time2 is:%lf\n", (double)(end1 - start1));
/*vec[0].clear();
vec[1].clear();
vec[2].clear();
vec[3].clear();*/
//long start2 = clock();
vector<int> vec1[4];
for (i = 0; i < vec[0].size(); i++)
vec1[0].push_back(arr0[i]);
for (i = 0; i < vec[1].size(); i++)
vec1[1].push_back(arr1[i]);
for (i = 0; i < vec[2].size(); i++)
vec1[2].push_back(arr2[i]);
for (i = 0; i < vec[3].size(); i++)
vec1[3].push_back(arr3[i]);
vec1[0].insert(vec1[0].end(), vec1[1].begin(), vec1[1].end());
vec1[0].insert(vec1[0].end(), vec1[2].begin(), vec1[2].end());
vec1[0].insert(vec1[0].end(), vec1[3].begin(), vec1[3].end());
//long end2 = clock();
//printf("The time3 is:%lf\n", (double)(end2 - start2));
return vec1[0];
/*vec[0].insert(vec[0].end(), vec[1].begin(), vec[1].end());
vec[0].insert(vec[0].end(), vec[2].begin(), vec[2].end());
vec[0].insert(vec[0].end(), vec[3].begin(), vec[3].end());
return vec[0];*/
}
/*
* 希爾排序:先取一個小於n的整數d1作為第一個增量,
* 把檔案的全部記錄分成(n除以d1)個組。所有距離為d1的倍數的記錄放在同一個組中。
* 先在各組內進行直接插入排序;然後,取第二個增量d2<d1重複上述的分組和排序,
* 直至所取的增量dt=1(dt<dt-l<…<d2<d1),即所有記錄放在同一組中進行直接插入排序為止。
*/
KeyInt* ShellSort(int N, KeyInt *p) //希爾排序
{
for (int i = N / 2; i > 2; i /= 2) {
for (int j = 0; j < i; j++) {
InsertSort(N, p, j, i);
}
}
InsertSort(N, p, 0, 1);
return p;
}
KeyInt* ShellSortParallel(int N, KeyInt *p)//希爾排序並行
{
for (int i = N / 2; i > 2; i /= 2) {
#pragma omp parallel for
for (int j = 0; j < i; j++) {
InsertSort(N, p, j, i);
}
}
InsertSort(N, p, 0, 1);
return p;
}
KeyInt* InsertSort(int N, KeyInt *p, int start, int inc)//指定起始點和步長進行插入排序
{
int temp;
for (int i = start + inc; i < N; i += inc) {
for (int j = i; (j >= inc) && (p[j] < p[j - inc]); j -= inc) {
int temp = p[j];
p[j] = p[j-inc];
p[j-inc] = temp;
}
}
return p;
}
/*
* 屬於插入類排序,是將整個無序列分割成若干小的子序列分別進行插入排序
* 排序過程:先取一個正整數d1<n,把所有序號相隔d1的陣列元素放一組,
* 組內進行直接插入排序;然後取d2<d1,重複上述分組和排序操作;直至di=1, 即所有記錄放進一個組中排序為止
* 初始:d=5 49 38 65 97 76 13 27 49 55 04
* 49 13 |-------------------|
* 38 27 |-------------------|
* 65 49 |-------------------|
* 97 55 |-------------------|
* 76 04 |-------------------|
* 一趟結果 13 27 49 55 04 49 38 65 97 76
* d=3 13 27 49 55 04 49 38 65 97 76
* 13 55 38 76 |------------|------------|------------|
* 27 04 65 |------------|------------|
* 49 49 97 |------------|------------|
* 二趟結果 13 04 49* 38 27 49 55 65 97 76
* d=1 13 04 49 38 27 49 55 65 97 76
* |----|----|----|----|----|----|----|----|----| 三趟結果
* 04 13 27 38 49 49 55 65 76 97
*/
void Merge(KeyInt *p, KeyInt *temp, int l, int r)//歸併
{
int mid = (l + r) / 2;
int i1 = l;
int i2 = mid + 1;
for (int cur = l; cur <= r; cur++) {
if (i1 == mid + 1)
p[cur] = temp[i2++];
else if (i2 > r)
p[cur] = temp[i1++];
else if (temp[i1] < temp[i2])
p[cur] = temp[i1++];
else
p[cur] = temp[i2++];
}
}
void MergeSort(KeyInt *p, KeyInt *temp, int l, int r) //歸併排序
{
int mid = (l + r) / 2;
if (l == r)
return;
MergeSort(p, temp, l, mid);
MergeSort(p, temp, mid + 1, r);
for (int i = l; i <= r; i++) {
temp[i] = p[i];
}
/*int i1 = l;
int i2 = mid + 1;
for (int cur = l; cur <= r; cur++) {
if (i1 == mid + 1)
p[cur] = temp[i2++];
else if (i2 > r)
p[cur] = temp[i1++];
else if (temp[i1] < temp[i2])
p[cur] = temp[i1++];
else
p[cur] = temp[i2++];
}*/
Merge(p, temp, l, r);
}
void MergeSort(KeyInt *p, int N)//非遞迴歸併排序
{
int i, left_min, left_max, right_min, right_max, next;
int *tmp = (int*)malloc(sizeof(int) * N);
for (i = 1; i < N; i *= 2) // i為步長,1,2,4,8……
{
for (left_min = 0; left_min < N - i; left_min = right_max)
{
right_min = left_max = left_min + i;
right_max = left_max + i;
if (right_max > N)
right_max = N;
next = 0;
while (left_min < left_max && right_min < right_max)
tmp[next++] = p[left_min] > p[right_min] ? p[right_min++] : p[left_min++];
while (left_min < left_max)
p[--right_min] = p[--left_max];
while (next > 0)
p[--right_min] = tmp[--next];
}
}
free(tmp);
}
/*
* 歸併操作(merge),也叫歸併演算法,指的是將兩個已經排序的序列合併成一個序列的操作。
* 如設有數列{6,202,100,301,38,8,1}
* 初始狀態: [6] [202] [100] [301] [38] [8] [1] 比較次數
* i=1 [6 202 ] [ 100 301] [ 8 38] [ 1 ] 3
* i=2 [ 6 100 202 301 ] [ 1 8 38 ] 4
* i=3 [ 1 6 8 38 100 202 301 ] 4
*/
void MergeSortParallel(KeyInt *p, int N)//並行非遞迴歸併排序
{
//int left_max, right_min, right_max, next;
int *tmp = (int*)malloc(sizeof(int) * N);
for (int i = 1; i < N; i *= 2) // i為步長,1,2,4,8……
{
#pragma omp parallel for
for (int left_min = 0; left_min < N - i; left_min += 2*i)
{
//int *tmp = (int*)malloc(sizeof(int) * 2*i);
int temp = left_min;
int right_min = temp + i;
int left_max = temp + i;
int right_max = left_max + i;
if (right_max > N)
right_max = N;
//int next = 0;
int next = left_min;
while (temp < left_max && right_min < right_max)
tmp[next++] = p[temp] > p[right_min] ? p[right_min++] : p[temp++];
while (temp < left_max)
p[--right_min] = p[--left_max];
while (next > left_min)
p[--right_min] = tmp[--next];
}
}
free(tmp);
}
void MergeSortParallel(KeyInt *p, KeyInt *temp, int l, int r)//2核歸併排序
{
int mid = (l + r) / 2;
if (l == r)
return;
#pragma omp parallel
{
#pragma omp sections
{
#pragma omp section
{
//printf("%d,", omp_get_num_threads());
//printf("%d,", omp_get_thread_num());
MergeSort(p, temp, l, mid);
}
#pragma omp section
{
//printf("%d,", omp_get_num_threads());
//printf("%d,", omp_get_thread_num());
MergeSort(p, temp, mid + 1, r);
}
}
}
//MergeSort(p, temp, l, mid);
//MergeSort(p, temp, mid + 1, r);
//printf("%d,", omp_get_num_threads());
/*for (int i = l; i <= r; i++) {
temp[i] = p[i];
}
int i1 = l;
int i2 = mid + 1;
for (int cur = l; cur <= r; cur++) {
if (i1 == mid + 1)
p[cur] = temp[i2++];
else if (i2 > r)
p[cur] = temp[i1++];
else if (temp[i1] < temp[i2])
p[cur] = temp[i1++];
else
p[cur] = temp[i2++];
}*/
Merge(p, temp, l, r);
}
void MergeSortParallel(KeyInt *p, KeyInt *temp, int N)//4核歸併排序
{
int i;
int *p1 = new int[N / 4];
int *p11 = new int[N / 4];
for (i = 0; i < N / 4; i++)
p1[i] = p[i];
int *p2 = new int[N / 4];
int *p22 = new int[N / 4];
for (i = 0; i < N / 4; i++)
p2[i] = p[i+N/4];
int *p3 = new int[N / 4];
int *p33 = new int[N / 4];
for (i = 0; i < N / 4; i++)
p3[i] = p[i+N/4+N/4];
int *p4 = new int[N - N / 4 * 3];
int *p44 = new int[N - N / 4 * 3];
for (i = 0; i < (N - N / 4 * 3); i++)
p4[i] = p[i+N/4+N/4+N/4];
#pragma omp parallel
{
#pragma omp sections
{
#pragma omp section
{
MergeSort(p1, p11, 0, N / 4-1);
}
#pragma omp section
{
MergeSort(p2, p22, 0, N / 4-1);
}
#pragma omp section
{
MergeSort(p3, p33, 0, N / 4 - 1);
}
#pragma omp section
{
MergeSort(p4, p44, 0, N - N / 4 * 3-1);
}
}
}
delete[] p11;
delete[] p22;
delete[] p33;
delete[] p44;
int* temp1 = new int[N / 4 + N / 4];
int* temp11 = new int[N / 4 + N / 4];
for (i = 0; i < N / 4; i++)
{
temp1[i] = p1[i];
temp11[i] = p1[i];
}
delete[] p1;
for (i = 0; i < N / 4; i++)
{
temp1[i + N / 4] = p2[i];
temp11[i + N / 4] = p2[i];
}
delete[] p2;
int* temp2 = new int[N-(N / 4 + N / 4)];
int* temp22 = new int[N - (N / 4 + N / 4)];
for (i = 0; i < N / 4; i++)
{
temp2[i] = p3[i];
temp22[i] = p3[i];
}
delete[] p3;
for (i = 0; i < (N - N / 4 * 3); i++)
{
temp2[i + N / 4] = p4[i];
temp22[i + N / 4] = p4[i];
}
delete[] p4;
Merge(temp1, temp11, 0, N / 4 + N / 4 - 1);
Merge(temp2, temp22, 0, N - (N / 4 + N / 4) - 1);
delete[] temp11;
delete[] temp22;
int* temp3 = new int[N];
int* temp33 = new int[N];
for (i = 0; i < N / 4 + N / 4; i++)
{
temp3[i] = temp1[i];
temp33[i] = temp1[i];
}
delete[] temp1;
for (i = 0; i < N - (N / 4 + N / 4); i++)
{
temp3[i + N / 4 + N / 4] = temp2[i];
temp33[i + N / 4 + N / 4] = temp2[i];
}
delete[] temp2;
Merge(temp3, temp33, 0, N-1);
for (i = 0; i < N; i++)
p[i] = temp3[i];
delete[] temp3;
delete[] temp33;
}
/*
* 快速排序:
* 一趟快速排序的演算法是:
* 1)設定兩個變數i、j,排序開始的時候:i=0,j=N-1;
* 2)以第一個陣列元素作為關鍵資料,賦值給key,即 key=A[0];
* 3)從j開始向前搜尋,即由後開始向前搜尋(j=j-1即j--),
* 找到第一個小於key的值A[j],A[i]與A[j]交換;
* 4)從i開始向後搜尋,即由前開始向後搜尋(i=i+1即i++),
* 找到第一個大於key的A[i],A[i]與A[j]交換;
* 5)重複第3、4、5步,直到 I=J;
* (3,4步是在程式中沒找到時候j=j-1,i=i+1,直至找到為止。
* 找到並交換的時候i, j指標位置不變。
* 另外當i=j這過程一定正好是i+或j-完成的最後令迴圈結束。)
*/
void QuickSort(KeyInt *p, int low, int high)//快排
{
if (low >= high)
{
return;
}
int first = low;
int last = high;
int key = p[first];/*用字表的第一個記錄作為樞軸*/
while (first < last)
{
while (first < last && p[last] >= key)
{
--last;
}
p[first] = p[last];/*將比第一個小的移到低端*/
while (first < last && p[first] <= key)
{
++first;
}
p[last] = p[first];
/*將比第一個大的移到高階*/
}
p[first] = key;/*樞軸記錄到位*/
QuickSort(p, low, first - 1);
QuickSort(p, first + 1, high);
}
void QuickSortAverage(KeyInt *p, int low, int high)//快排+三數取中+插入
{
if (high - low + 1 < 20)
{
InsertSort(p, low, high);
return;
}//else時,正常執行快排
int first = low;
int last = high;
//int key = p[first];/*用字表的第一個記錄作為樞軸*/
int key = SelectPivotMedianOfThree(p, low, high);
while (first < last)
{
while (first < last && p[last] >= key)
{
--last;
}
p[first] = p[last];/*將比第一個小的移到低端*/
while (first < last && p[first] <= key)
{
++first;
}
p[last] = p[first];
/*將比第一個大的移到高階*/
}
p[first] = key;/*樞軸記錄到位*/
QuickSortAverage(p, low, first - 1);
QuickSortAverage(p, first + 1, high);
}
void QuickSortSame(KeyInt *p, int low, int high)//快排+三數取中+插入+聚集相等元素
{
if (high - low + 1 < 20)
{
InsertSort(p, low, high);
return;
}
int temp;
int first = low;
int last = high;
int left = low;
int right = high;
int leftLen = 0;
int rightLen = 0;
//一次分割
int key = SelectPivotMedianOfThree(p, low, high);//使用三數取中法選擇樞軸
while (low < high)
{
while (high > low && p[high] >= key)
{
if (p[high] == key)//處理相等元素
{
//swap(p[right], p[high]);
temp = p[right];
p[right] = p[high];
p[high] = temp;
right--;
rightLen++;
}
high--;
}
p[low] = p[high];
while (high > low && p[low] <= key)
{
if (p[low] == key)
{
//swap(p[left], p[low]);
temp = p[left];
p[left] = p[low];
p[low] = temp;
left++;
leftLen++;
}
low++;
}
p[high] = p[low];
}
p[low] = key;
//一次快排結束
//把與樞軸key相同的元素移到樞軸最終位置周圍
int i = low - 1;
int j = first;
while (j < left && p[i] != key)
{
//swap(p[i], p[j]);
temp = p[i];
p[i] = p[j];
p[j] = temp;
i--;
j++;
}
i = low + 1;
j = last;
while (j > right && p[i] != key)
{
//swap(p[i], p[j]);
temp = p[i];
p[i] = p[j];
p[j] = temp;
i++;
j--;
}
QuickSortSame(p, first, low - 1 - leftLen);
QuickSortSame(p, low + 1 + rightLen, last);
}
void QuickSortParallel(KeyInt *p, int low, int high)//2核快排
{
p[0] = BOUNDARY / 2;
/*for (int i = low; i <= high; i++)
{
if (abs(p[i] - BOUNDARY / 2) < 10)
{
int temp = p[i];
p[i] = p[0];
p[0] = temp;
break;
}
}*/
int mid = Partition(p, low, high);
#pragma omp parallel
{
#pragma omp sections
{
#pragma omp section
{
QuickSortAverage(p, low, mid-1);
}
#pragma omp section
{
QuickSortAverage(p, mid+1, high);
}
}
}
}
void QuickSortParallel4Core(KeyInt *p, int low, int high)//4核快排
{
p[0] = BOUNDARY / 2;
/*for (int i = low; i <= high; i++)
{
if (abs(p[i] - BOUNDARY / 2) < 10)
{
int temp = p[i];
p[i] = p[0];
p[0] = temp;
break;
}
}*/
int mid = Partition(p, low, high);
p[low] = BOUNDARY / 4;
int quarter1 = Partition(p, low, mid - 1);
p[mid + 1] = BOUNDARY / 4 * 3;
int quarter2 = Partition(p, mid + 1, high);
#pragma omp parallel
{
#pragma omp sections
{
#pragma omp section
{
//double start1 = omp_get_wtime();
QuickSortAverage(p, low, quarter1-1);
//double end1 = omp_get_wtime();
//printf("%lf\n", end1 - start1);
}
#pragma omp section
{
//double start2 = omp_get_wtime();
QuickSortAverage(p, quarter1 + 1, mid-1);
//double end2 = omp_get_wtime();
//printf("%lf\n", end2 - start2);
}
#pragma omp section
{
//double start3 = omp_get_wtime();
QuickSortAverage(p, mid+1, quarter2-1);
//double end3 = omp_get_wtime();
//printf("%lf\n", end3 - start3);
}
#pragma omp section
{
//double start4 = omp_get_wtime();
QuickSortAverage(p, quarter2+1, high);
//double end4 = omp_get_wtime();
//printf("%lf\n", end4 - start4);
}
}
}
}
/*函式作用:取待排序序列中low、mid、high三個位置上資料,選取他們中間的那個資料作為樞軸*/
int SelectPivotMedianOfThree(int *arr, int low, int high)//三數取中
{
int temp;
int mid = low + ((high - low) >> 1);//計算陣列中間的元素的下標
//使用三數取中法選擇樞軸
if (arr[mid] > arr[high])//目標: arr[mid] <= arr[high]
{
//swap(arr[mid], arr[high]);
temp = arr[mid];
arr[mid] = arr[high];
arr[high] = temp;
}
if (arr[low] > arr[high])//目標: arr[low] <= arr[high]
{
//swap(arr[low], arr[high]);
temp = arr[low];
arr[low] = arr[high];
arr[high] = temp;
}
if (arr[mid] > arr[low]) //目標: arr[low] >= arr[mid]
{
//swap(arr[mid], arr[low]);
temp = arr[mid];
arr[mid] = arr[low];
arr[low] = temp;
}
//此時,arr[mid] <= arr[low] <= arr[high]
return arr[low];
//low的位置上儲存這三個位置中間的值
//分割時可以直接使用low位置的元素作為樞軸,而不用改變分割函數了
}
int Partition(int * a, int low, int high)//分隔
{
int pivotkey = a[low];
while (low<high)
{
while (low<high && a[high] >= pivotkey)
--high;
a[low] = a[high];
while (low<high && a[low] <= pivotkey)
++low;
a[high] = a[low];
}
//此時low==high
a[low] = pivotkey;
return low;
}
void NonRecursiveQuickSort(int *a, int len)//用棧實現快排
{
stack<Region> regions;//定義一個棧變數
Region region;
region.low = 0;
region.high = len - 1;
regions.push(region);
while (!regions.empty())
{
region = regions.top();
regions.pop();
int p = Partition(a, region.low, region.high);
if (p - 1>region.low)
{
Region regionlow;
regionlow.low = region.low;
regionlow.high = p - 1;
regions.push(regionlow);
}
if (p + 1<region.high)
{
Region regionhigh;
regionhigh.low = p + 1;
regionhigh.high = region.high;
regions.push(regionhigh);
}
}
}
KeyInt* randomCreate(int N) {
int i = 0;
KeyInt *p;
p =(KeyInt*) malloc(N * sizeof(KeyInt));
for (i = 0; i < N; i++)
p[i] = myrandom(BOUNDARY);
//p[i] = random(BOUNDARY);
return (p);
}
bool happened(double probability)//probability 0~1
{
if (probability <= 0)
{
return false;
}
if (probability<MinProb)
{
return rand() == 0 && happened(probability*(RAND_MAX + 1));
}
if (rand() <= probability*(RAND_MAX + 1))
{
return true;
}
return false;
}
int myrandom(int n)//產生0~n-1之間的等概率隨機數
{
int t = 0;
if (n <= RAND_MAX)
{
int R = RAND_MAX - (RAND_MAX + 1) % n;//尾數
t = rand();
while (t > R)
{
t = rand();
}
return t % n;
}
else
{
int r = n % (RAND_MAX + 1);//餘數
if (happened((double)r / n))//取到餘數的概率
{
return n - r + myrandom(r);
}
else
{
return rand() + myrandom(n / (RAND_MAX + 1))*(RAND_MAX + 1);
}
}
}
void DisPlay(int N, KeyInt *p)
{
for (int i = 0; i < 100; i++)
printf("%d\n", p[i]);
}
以上排序演算法都是對隨機函式rand()生成的隨機陣列進行排序的,我對各個排序的效能以及並行加速比進行了比較與分析。我們知道,rand()函式是通過線性同餘法生成的偽隨機數,範圍是0~2^15-1(32767),這個範圍對像快排這樣的排序演算法來說就顯得較小。所以這裡需要在rand()函式的基礎上進行改進,就是我上面寫的myrandom()函式,產生0~n-1範圍內的等概率隨機數。
這裡我說明一下並行設計需要注意的一些問題:
1:演算法能隨CPU核數擴充套件,即CPU核數升級後不需要修改演算法就可以取得加速比效能的線性增加。
2:演算法能有一個較好的能耗效率,演算法並不是越快越好,而是需要在速度和CPU能耗方面取得均衡,有時候為了追求效率,但是卻讓CPU能耗提高了許多。最好的做法是加速比能夠達到一定目標的情況下儘量降低CPU能耗。也就是說不需要片面去追求將程式並行化。有些時候程式序列執行比並行執行慢不了多少,但是CPU能耗卻降低了不少。
3:需要控制執行緒的粒度,否則執行緒粒度太細,頻繁建立執行緒會導致大量的額外開銷,從而使得效率大大降低。
4:在設計並行排序演算法時,還要考慮記憶體管理的開銷,由於並行演算法使用了多個執行緒,如果記憶體分配和釋放操作頻繁的話,那麼花費在這方面的開銷將是非常巨大的。
還有一些排序以及改進包括一些思想我會在後續的更新中介紹,歡迎各位指出程式中的不足,期待各位同行的討論。
相關推薦
排序演算法及並行分析
最近學了高效能運算這門課程,老師讓用OpenMP、MPI或mapReduce寫個大作業。我之前剛好在寫排序,於是我就將常用的排序寫了一遍並且用OpenMP進行並行,計算加速比等資料進行分析。在這篇文章中我主要介紹八大基本排序的實現原理及程式碼,以及對這些演算法進行改進從而讓它
八大排序演算法及時間空間複雜度分析,java版
//放在一起感覺又臭又長,所以每排序我單獨放出來了,歡迎大家平均交流指出不足import java.lang.reflect.Array;import java.util.*;public class EightKindOfSort {/*選擇排序 (不穩定演算法) *
排序演算法之效能分析及總結
一、排序演算法說明 排序的定義:對一個無序的序列進行排序的過程。 輸入:n個數:a1,a2,a3,…,an。 輸出:n個數的排列:a1,a2,a3,…,an,使得a1<=a2<=a3<=…<=an。 排序的穩定性:相同值的節點相對
八大排序——氣泡排序的優化演算法及效能分析(C語言)
穩定性:氣泡排序就是把小的元素往前調或者把大的元素往後調。比較是相鄰的兩個元素比較,交換也發生在這兩個元素之間。所以,如果兩個元素相等,是不必再去交換的;如果兩個相等的元素沒有相鄰,那麼即使通過前面的兩兩交換把兩個相鄰起來,這時候也不會交換,所以相同元素的前後順序並沒有改變,所以氣泡排序是一種穩定排序演算法。
快速排序演算法及時間複雜度分析(原地in-place分割槽版本)
快速排序演算法一般來說是採用遞迴來實現,其最關鍵的函式是partition分割函式,其功能是將陣列劃分為兩部分,一部分小於選定的pivot,另一部分大於選定的pivot。我們將重點放在該函式上面。 partition函式總體思路是自從一邊查詢,找到小於pivot的元素,則將
堆排序演算法及時間複雜度分析
堆其實通常是通過一維陣列來實現的,在陣列起始下標為0的情況下,父節點i的左右子節點分別為2*i+1和2*i+2,子節點i的父節點為(i-1)/2。對堆的操作主要有建堆、調整堆、堆排序、插入和刪除堆中元素。其中調整堆是核心。下面將重點介紹兩種調整堆的方法。 向下調整堆(shi
Java實現經典排序演算法及複雜度穩定性分析
/* 氣泡排序 */ public static int[] bubbleSort(int[] arry) { for (int i = 0; i < arry.length; i++) {
從零開始Rtklib解讀篇-簡單的程式設計理論和演算法及結構分析(四)
首先我們來說一說VS常用的除錯技巧,比較常用的內容我會寫在下面。 1、斷點。我就不細說了。 2、條件斷點,在斷點上右鍵,彈出的選單可以選擇條件設定,在找一些問題的時候會比較快一點。我有時會配合靜態變數強行搜尋到error發生前。 3、檢視指標值,監視視窗輸入,比如p,3 ,即可檢視p
從零開始Rtklib解讀篇-簡單的程式設計理論和演算法及結構分析(三)
1. argc和argv argc和argv中的arg指的是"引數",首先是一個計算提供的引數到程式,第二個是對字串陣列的指標 argc: 整數,用來統計你執行程式時送給main函式的命令列引數的個數 * argv[ ]: 字串陣列,用來存放指向你
從零開始Rtklib解讀篇-簡單的程式設計理論和演算法及結構分析(二)
從bin裡進入。主進入方式為RTKLAUNCH.exe 第一個RTKPLOT右上角的小方塊可以勾選NormalAPs,RTKPOST_MKL,RTKPOST_WIN64, Minimize等選項。通常是第一個。另外64位系統下通常也是選用NormalAPs,RTKPOST_WIN
從零開始Rtklib解讀篇-簡單的程式設計理論和演算法及結構分析(一)
Rtklib一直開源,資源比較容易找到,功能也非常強大。因為專業有點相關,但是之前不用這個平臺,一直未能好好沉下心來學習,然而學到用時方恨少。這個系列也算是自己的一個小小的總結吧,因為我對VS、對Rtklib、對演算法的理解也比較淺,很多內容未必正確,寫的時候也不一定非常有條理,不當之處,還請指出並
C++拾取——使用stl標準庫實現排序演算法及評測
今天看了一篇文章,講各種語言的優勢和劣勢。其中一個觀點:haskell非常適合寫演算法,因為使用者不用去關心具體的計算機實現,而只要關注於操作語義。這讓它在專心研究演算法的人中非常受歡迎。所以很多時候,語言的爭論沒有太多的意義,有意義的是它
PageRank 演算法及例項分析
本文一部分是針對圖的PageRank 的實現,以及具體資料集的分析過程的記錄。 另一部分是BFS的實現,並記錄每一層的節點數。 資料集下載地址 soc-Slashdot0811 、 roadNet-CA 、 soc-LiveJournal1 1. java 實現程式碼 M
插入排序演算法及C語言實現
插入排序演算法是所有排序方法中最簡單的一種演算法,其主要的實現思想是將資料按照一定的順序一個一個的插入到有序的表中,最終得到的序列就是已經排序好的資料。 直接插入排序是插入排序演算法中的一種,採用的方法是:在新增新的記錄時,使用順序查詢的方式找到其要插入的位置,然後將新記錄插入。 很多初學者所說的插入排
圖解排序演算法及實現——希爾排序 (Shell Sort)
希爾排序(ShellSort)也稱增量遞減排序演算法,即跨多步版的InsertionSort,是InsertionSort基礎上的改進版。InsertionSort可以看作ShellSort中gap=1的特例。希爾排序是非穩定排序演算法。 希爾排序通過將全部元
常見排序演算法及對應的時間複雜度和空間複雜度
轉載請註明出處: 排序演算法經過了很長時間的演變,產生了很多種不同的方法。對於初學者來說,對它們進行整理便於理解記憶顯得很重要。每種演算法都有它特定的使用場合,很難通用。因此,我們很有必要對所有常見的排序演算法進行歸納。 排序大的分類可以分為兩種:內
雙路快速排序演算法及三路快速排序演算法視覺化
雙路快速排序演算法 工具類 import java.awt.*; import java.awt.geom.Ellipse2D; import java.awt.geom.Rectangle2D; import java.lang.Interrup
七種排序演算法的簡單分析與實現
{ int nFirst = nLow; int nMid = (nLow + nHigh) /2; int nSecond = nMid +1; int* p = (int*)malloc(sizeof(int) * (nHigh - nLow +1)); int nIndex
幾種常見的排序演算法及它們之間的比較
1.穩定性比較 插入排序、氣泡排序、二叉樹排序、二路歸併排序及其他線形排序是穩定的 選擇排序、希爾排序、快速排序、堆排序是不穩定的 2.時間複雜性比較 插入排序、氣泡排序、選擇排序的時間複雜性為O(n2) 其它非線形排序的時間複雜性為O(nlog2n) 線形排序的
黑馬程式設計師____四種排序演算法的比較分析
下面將詳細介紹隨機數的生成以及四種排序演算法的設計技巧。1)隨機數生成 由於題目要求生成[0,2……32−1]之間的隨機數,而c標準庫中的隨機數函式rand()只能生成[0,32767]之間的隨機數,因此採用拼接的方法來生成32位的隨機數。 將32位的數分成三段,即2位,15位,15位三段。後面兩段可以直接用