1. 程式人生 > >雙調排序(Bitonic sort)學習

雙調排序(Bitonic sort)學習

之前的時候參加浙大的考核,學習了一下雙調排序,正好現在在學習CUDA程式設計,因此把之前的學習成果整理一下。

問題說明: 
*********************** 
給出分成m段 的n個浮點數,輸入資料已按段號有序,但每段內部 無序。用C/C++ 編寫一個分段雙調排序(Bitonic sort)函式,對每一段內部的浮點數進行排序,但 不要改變段間的位置。 


介面方式: 
*********************** 
void segmentedBitonicSort(float* data, int* seg_id, int* seg_start, int n, int m); 
輸入資料中,data包含需要分段排序的n個float值,seg_id給出data中n個元素各 自所在的 段編號。seg_start共有m+1個元素,前m個分別給 出0..m-1共m個段的起 始位置,seg_start[m]保證等於n。 
seg_id中的元素保證單調不下降,即對任意的i<j,seg_id[i]<=seg_id[j]。 seg_id所有元 素均在0到m-1範圍內。 
輸出結果覆蓋data,保證每一段內排序,但不改變段間元素的順序。 

注意: 
*********************** 
1、必須使用雙調排序演算法進行排序。 
2、可以直接使用從網上下載的雙調排序程式碼,但須註明出處。 


樣例輸入: 
*********************** 
float data[5]={0.8, 0.2, 0.4, 0.6, 0.5}; 
int seg_id[5]={0,   0,   1,   1,   1} 
int seg_start[3]={0,2,5}; 
int n=5; 
int m=2; 

樣例輸出: 
*********************** 
float data[5]={0.2, 0.8, 0.4, 0.5, 0.6}; 

加分挑戰(非必需): 
*********************** 
1、不遞迴:segmentedBitonicSort函式及其所呼叫的任何其他函式都不得直接或 間接地進行遞迴。 
2、不呼叫函式:segmentedBitonicSort不呼叫除標準庫函式外的任何其他函式。 
3、記憶體高效:segmentedBitonicSort及其所呼叫的任何其他函式都不得進行動態 記憶體分配,包括malloc、new和靜態定義的STL容器。 
4、可並行:segmentedBitonicSort涉及到的所有時間複雜度O(n)以上的程式碼都寫 在for循 環中,而且每個這樣的for迴圈內部的迴圈順序可 以任意改變,不影響程 序結果。注:自己測試時可以用rand()決定迴圈順序。 
5、不需記憶體:segmentedBitonicSort不呼叫任何函式(包括C/C++標準庫函式), 不使用全域性變數,所有區域性變數都是int、float或指標類 型,C++程式不使用new 關鍵字。 
6、絕對魯棒:在輸入資料中包含NaN時(例如sqrt(-1.f)),保證除NaN以外 的數 據正確排序,NaN的個數保持不變。 

演算法描述

雙調序列

在瞭解雙調排序演算法之前,我們先來看看什麼是雙調序列。 雙調序列是一個先單調遞增後單調遞減(或者先單調遞減後單調遞增)的序列。

Batcher定理

將任意一個長為2n的雙調序列A分為等長的兩半X和Y,將X中的元素與Y中的元素一一按原序比較,即a[i]與a[i+n] (i < n)比較,將較大者放入MAX序列,較小者放入MIN序列。則得到的MAX和MIN序列仍然是雙調序列,並且MAX序列中的任意一個元素不小於MIN序列中的任意一個元素[2]。

雙調排序

假設我們有一個雙調序列,則我們根據Batcher定理,將該序列劃分成2個雙調序列,然後繼續對每個雙調序列遞迴劃分,得到更短的雙調序列,直到得到的子序列長度為1為止。這時的輸出序列按單調遞增順序排列。

見下圖:升序排序,具體方法是,把一個序列(1…n)對半分,假設n=2^k,然後1和n/2+1比較,小的放上,接下來2和n/2+2比較,小的放上,以此類推;然後看成兩個(n/2)長度的序列,因為他們都是雙調序列,所以可以重複上面的過程;總共重複k輪,即最後一輪已經是長度是2的序列比較了,就可得到最終的排序結果。

任意序列生成雙調序列

前面講了一個雙調序列如何排序,那麼任意序列如何變成一個雙調序列呢?

這個過程叫Bitonic merge, 實際上也是divide and conquer的思路。 和前面sort的思路正相反, 是一個bottom up的過程——將兩個相鄰的,單調性相反的單調序列看作一個雙調序列, 每次將這兩個相鄰的,單調性相反的單調序列merge生成一個新的雙調序列, 然後排序(同3、雙調排序)。 這樣只要每次兩個相鄰長度為n的序列的單調性相反, 就可以通過連線得到一個長度為2n的雙調序列,然後對這個2n的序列進行一次雙調排序變成有序,然後在把兩個相鄰的2n序列合併(在排序的時候第一個升序,第二個降序)。 n開始為1,每次翻倍,直到等於陣列長度,最後就只需要再一遍單方向(單調性)排序了。

以16個元素的array為例,

1. 相鄰兩個元素合併形成8個單調性相反的單調序列,

2. 兩兩序列合併,形成4個雙調序列,分別按相反單調性排序

3. 4個長度為4的相反單調性單調序列,相鄰兩個合併,生成兩個長度為8的雙調序列,分別排序

4. 2個長度為8的相反單調性單調序列,相鄰兩個合併,生成1個長度為16的雙調序列,排序雙調排序示意圖

演算法實現

我首先實現了雙調排序的基本功能,遞迴無疑是最容易思考的方法,具體思想如下

其中雙調序列排序是核心部分,其具體思想又可由下圖表示:

其中MAX和MIN為兩個小的雙調序列,MAX的所有元素大於MIN的所有元素(Batcher定理),進而可以將其進一步拆分成更小的雙調序列。

由於雙調排序對於2的冪次長的序列有最為簡單的形式,因此在演算法實現中對序列進行的補長,使得補長後的序列長度為2的冪次。最後再將填充的元素從序列中剔除即可。

程式碼中當出現除以2或者乘以2的時候一律使用右移或左移,可以使得效率更高。

我重點對網上的雙調排序的程式碼進行了精簡,主要是針對Sort函式和Merge函式,增加了程式碼的重用性。

拓展部分

不遞迴

為了保證不遞迴,我們需要採用迭代的方法進行排序,程式碼以及具體思路如下:

以序列總長度為8為例:

其中利用箭頭代表每次比較後箭頭連線的兩個元素的大小關係

step表示每過step的步長以後是一個新的雙調序列

step0表示將當前雙調序列拆分成兩個子雙調序列需要比較的總次數

i表示以兩個雙調序列為一組,i為每個組的起始序號(由於兩個雙調序列經過排序以後可以合成一個雙調序列,所以用i來表示每個組的起始序號)

j表示在對雙調序列進行雙調排序時,每過多少間隔可以劃分成一個新的子序列

k表示將當前雙調序列拆分成兩個子雙調序列時正在第k次比較

所以對於需要升序的雙調排序而言,當前比較的序號為第i個雙調序列中第j個子序列的第k次比較,即i+j+k;對於需要降序的雙調排序而言還需要加上step表示與升序的雙調序列相鄰的雙調序列,即i+j+k+step。通過這種迭代的演算法,可以免去原演算法中的遞迴部分

不呼叫函式

由於沒有了遞迴,自然就免去了遞迴函式;同時也可以將交換函式swap放在segmentedBitonicSort中使得segmentedBitonicSort不呼叫任何的函式

記憶體高效

在原來的實現中要通過new來開闢一片新的資料區來存放增長後的資料,但在改進後直接在segmentedBitonicSort中宣告一個新的陣列來存放資料即可。

可並行

segmentedBitonicSort涉及到的所有時間複雜度O(n)以上的程式碼都寫在for迴圈中,而且for迴圈內部除了step與step0的迴圈外迴圈順序均可以任意改變,驗證之後發現其餘三個迴圈的順序確實不影響程式結果。

不需記憶體

原實現需要通過ERROR這個全域性變數來通知主程式輸入是否出錯,現在講輸出也放在segmentedBitonicSort內,使得出錯後可以直接return,開始讀入下一組資料

絕對魯棒

利用NaN!=NaN的特點來對其進行識別,使得只要滿足這個條件時,它總會排在較後的位置,因此當雙調排序結束後,NaN總會排在序列最後面且保證NaN的個數不變

原始碼

/***************************************************
* 檔名:Version2.cpp
* 檔案描述:給出分成m段的n個浮點數,輸入資料已按段號有序,但每段內部無序。用C/C++ 編寫一個分段雙調排序(Bitonic sort)函式,對每一段內部的浮點數進行排序,但不改變段間的位置。本版本用迭代的方法
* 作者:
* 身份:
* E-mail   . 
* Mobile   . 
* QQ/Wechat. 
***************************************************/

#include<cstring>
#include<stdio.h>
#include<vector>
using namespace std;

#define MAX 99999//MAX為將資料補長至2的冪的填充數
#define MIN 0.00000000001//浮點數無法直接比較,因此設定一個最小值,當兩者之差小於MIN時認為二者相等


void segmentedBitonicSort(float* data, int* seg_id, int* seg_start, int n, int m)
{
	//輸入出錯
	if (seg_start[m] != n || seg_id[n - 1] != (m - 1))
	{
		printf("Input Error!\n");
		return;
	}

	int seg = 0;
	for (int i = 0; i < n; i++)
	{
		if (seg < m && i == seg_start[seg])
		{
			float* num = data + i;//num為每段開頭的元素的地址
			int len = 1;
			int seg_len = seg_start[seg + 1] - seg_start[seg];

			//計算不小於資料長度的最小的2的冪
			while (len < seg_len)
			{
				len = len << 1;
			}

			float Nnum[65536] = { 0 };

			for (int i = 0; i<len; ++i)  //將資料補長,使資料的長度為2的冪
			{
				Nnum[i] = (i < seg_len) ? num[i] : MAX;

			}

			//通過迭代實現雙調排序
			for (int step = 2; step <= len; step <<= 1)
			{
				for (int i = 0; i < len; i += step << 1)
				{
					for (int step0 = step >> 1; step0 >0; step0 >>= 1)
					{
						for (int j = 0; j < step; j += step0 << 1)
						{
							for (int k = 0; k < step0; ++k)
							{
								if (Nnum[i + j + k] > Nnum[i + j + k + step0] || Nnum[i + j + k] != Nnum[i + j + k])
								{
									float temp = Nnum[i + j + k];
									Nnum[i + j + k] = Nnum[i + j + k + step0];
									Nnum[i + j + k + step0] = temp;
								}
								if (i + step < len)
								{
									if (Nnum[i + j + k + step] < Nnum[i + j + k + step + step0] || Nnum[i + j + k + step + step0] != Nnum[i + j + k + step + step0])
									{
										float temp = Nnum[i + j + k + step];
										Nnum[i + j + k + step] = Nnum[i + j + k + step + step0];
										Nnum[i + j + k + step + step0] = temp;
									}
								}
							}
						}
					}
				}
			}

			int j = 0;
			//刪除補長的填充碼
			for (int i = 0; i<len; ++i)
			{
//將排序後的除了填充元素的所有元素(包括NaN)放到原陣列中
				if (MAX - Nnum[i] > MIN || Nnum[i] != Nnum[i])
				{
					num[j++] = Nnum[i];
				}
			}
			seg++;
		}
	}

	//輸出結果
	for (int i = 1; i<m + 1; i++)
	{
		printf("seg_id %d : ", i - 1);
		for (int j = seg_start[i - 1]; j < seg_start[i]; j++)
		{
			if (data[j] == data[j])
				printf("%g ", data[j]);
			else
				printf("nan ");
		}
		printf("\n");
	}
	return;
}


int main()
{
	int n;
	int m;
	float data[1000] = { 0 };
	int seg_id[1000] = { 0 };
	int seg_start[1000] = { 0 };

	/*輸入格式:n, m, data, seg_id, seg_start
	其中n代表n個浮點數
	m代表將浮點數分為m段
	data為具體的數值
	seg_id表示data中n個元素各自所在的 段編號
	seg_start表示0..m-1共m個段的起始位置*/

	printf("Please input in following order: \nn, m, data, seg_id, seg_start\n");
	while (scanf("%d%d", &n, &m) != EOF)
	{
		if (n <= 0 || m <= 0 || m>n)
		{
			printf("Input Error!\n");
			continue;
		}
		for (int i = 0; i<n; i++)
		{
			scanf("%f", &data[i]);
		}
		for (int i = 0; i<n; i++)
		{
			scanf("%d", &seg_id[i]);
		}
		for (int i = 0; i<m + 1; i++)
		{
			scanf("%d", &seg_start[i]);
		}
		segmentedBitonicSort(data, seg_id, seg_start, n, m);
		memset(data, 0, sizeof(data));
		memset(seg_id, 0, sizeof(seg_id));
		memset(seg_start, 0, sizeof(seg_start));
	}
	return 0;
}

效能分析 

雙調排序的核心思路是先將普通的序列裝化為雙調序列,並對雙調序列進行雙調排序後合成大的雙調序列,而雙調排序又是將大的雙調序列拆分成小的雙調序列後進行比較。這麼做看起來好像是繞了一個大彎,本來一個序列直接對他進行排序就好了,為何又要把他先合起來再拆分呢?在時間複雜度的維度上來看,我們根據前面比較次數的圖片可以很容易求出雙調排序的時間複雜度(具體推導如下)

可見其時間複雜度為O(n*logn*logn),雖然比氣泡排序O(n^2)好一點,但是卻不如常用的快速排序與歸併排序的O(n*logn)。那麼我們又為什麼要採用這個排序方法呢?原因是雙調排序是排序網路方法中的一種。而排序網路指的就是序列是資料獨立的,即網路比較順序與資料無關的排序方法。它將原本只能一次性處理的資料轉換成了多次處理,所以特別適合硬體做並行化。這也與通訊中的FFT(快速傅立葉變換)思想非常相像:為了求出離散訊號的頻譜,我們往往不是用DFT來求,而是採用更快的FFT來求,具體思路也是講長序列拆分為短序列,再通過蝶形圖(如下圖)合成長序列的頻譜。

綜上所述:雙調排序經過優化後具有以下優點:

  1. 不遞迴,降低空間複雜度
  2. 不呼叫其他庫函式
  3. 保證記憶體的高效性,不適用動態記憶體分配
  4. 可以並行運算從而使當有多處理器時可以降低總的處理時間
  5. 不需要記憶體,不存在全域性變數,不使用new
  6. 絕對魯棒,在輸入資料中包含NaN時(例如sqrt(-1.f)),把NaN當作比任意數大的數進行排序,保證除NaN以外的資料正確排序,NaN的個數保持不變。

缺點有:

  1. 時間複雜度不如歸排、快排
  2. 當資料量大時,難以用陣列存放資料,但是如果進行優化可以通過setvbuf函式設定I/O快取,從而保證程式的正確執行
  3. 雖然在segmentedBitonicSort開闢的陣列是區域性變數,但是由於它需要容納不定長的資料,所以我給他開了65536的長度,雖然函式呼叫結束後會釋放,但是在函式執行的時候還是造成了很大一部分的記憶體浪費 

參考資料

CUDA(六). 從並行排序方法理解並行化思維——冒泡、歸併、雙調排序的GPU實現

三十分鐘理解:雙調排序Bitonic Sort,適合平行計算的排序演算法

分段雙調排序實現

 老師給出的回覆:

我們對你提交的程式碼進行了測試,發現有如下問題:

1. 引入了介面定義之外的依賴;

2. 棧上直接宣告靜態陣列的方法支援的資料規模有限;

3. 不需記憶體的挑戰也不能算完成;