歸併排序多執行緒化
Multiple thread for the MergeSort
0. 來源
在閱讀 Introduction to Algorithm 中多執行緒一章後,決定自己實現 ++歸併排序的多執行緒化++ 。本以為,它會很簡單,而且併發後效果會比較好。然而,實現之路困難重重,並且效果很差。
故,把整個過程記錄在此。。。
1. 過程
1.1 從“序列”的歸併排序開始
比較簡單直接上程式碼。
import java.util.Arrays; public class MergeSortSingle implements MergeSort { @Override public int[] mergeSortFunction(int[] array) { int[] myArray = Arrays.copyOf(array, array.length); mergeSort(myArray, 0, myArray.length - 1); return myArray; } private void mergeSort(int[] array, int s, int e) { if (s != e) { int mid = (s + e) / 2; mergeSort(array, s, mid); mergeSort(array, mid + 1, e); merge(array, s, mid, e); } } private void merge(int[] array, int s, int mid, int e) { int[] a1 = Arrays.copyOfRange(array, s, mid + 1); int[] a2 = Arrays.copyOfRange(array, mid + 1, e + 1); int j = 0; int k = 0; for (int i = s; i <= e; i++) { if (j >= a1.length) { array[i] = a2[k]; ++k; continue; } if (k >= a2.length) { array[i] = a1[j]; ++j; continue; } if (a1[j] < a2[k]) { array[i] = a1[j]; ++j; } else { array[i] = a2[k]; ++k; } } } }
1.2 直接多執行緒化
可以看到 歸併排序 是一種典型的
分治演算法
,它分為 分解 解決 合併 三個部分。我們相對歸併排序多執行緒化,可一下想到從++分解之處++下手,也就是關於的mergeSort()
的兩處呼叫可以並行執行。但是,需要注意的是:++考慮併發的同時,必須考慮同步。++ 比如,這裡的
merge()
也就是合併過程,必須要等到前面兩處分解結束後才能執行。OK,現在有了要併發的目標,實現上最困難之處有以下兩點:
- 如何實現同步,也就是等待兩處
mergeSort()
結束後才執行merge()
; - 如何管理執行緒,使用 執行緒池
從程式碼中可以看出,這裡使用了
ExecutorService
提供的執行緒池來 管理執行緒。並且利用了ReentrantLock
&Condition
鎖實現 同步。- 如何實現同步,也就是等待兩處
在使用它們的時候遇到下面兩個問題:
- 執行緒池的大小(若使用固定大小的執行緒池往往發生死鎖,除非你設定的足夠大。因為,此巢狀並行到回溯時才能一一的釋放執行緒,應該畫個圖)
鎖所在的位置(此鎖的作用是為了等待。如果把鎖作為類的屬性,它會把所有的在該鎖下等待的都喚醒。然而,它只應該喚醒的對應執行緒內的。所以,把鎖作為類屬性會擴大它的範圍,導致問題)
import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MergeSortPool implements MergeSort { private ExecutorService executorService; // 鎖必須在方法內 // private ReentrantLock reentrantLock = new ReentrantLock(); // private Condition condition = reentrantLock.newCondition(); public MergeSortPool() { // this.executorService = Executors.newFixedThreadPool(threadNum); this.executorService = Executors.newCachedThreadPool(); } @Override public int[] mergeSortFunction(int[] array) { int[] myArray = Arrays.copyOf(array, array.length); mergeSort(myArray, 0, myArray.length - 1); this.executorService.shutdown(); return myArray; } // TODO 若執行緒池設定的太小,則會導致死鎖。因為,會出現把所有的執行緒 private void mergeSort(int[] array, int s, int e) { ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); AtomicBoolean needWait = new AtomicBoolean(true); if (s != e) { int mid = (s + e) / 2; this.executorService.execute(() -> { try { mergeSort(array, s, mid); } finally { try { reentrantLock.lock(); needWait.set(false); condition.signal(); } finally { reentrantLock.unlock(); } } }); mergeSort(array, mid + 1, e); try { reentrantLock.lock(); if (needWait.get()) condition.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { reentrantLock.unlock(); } merge(array, s, mid, e); } } private void merge(int[] array, int s, int mid, int e) { int[] a1 = Arrays.copyOfRange(array, s, mid + 1); int[] a2 = Arrays.copyOfRange(array, mid + 1, e + 1); int j = 0; int k = 0; for (int i = s; i <= e; i++) { if (j >= a1.length) { array[i] = a2[k]; ++k; continue; } if (k >= a2.length) { array[i] = a1[j]; ++j; continue; } if (a1[j] < a2[k]) { array[i] = a1[j]; ++j; } else { array[i] = a2[k]; ++k; } } } }
1.3 複雜多執行緒化
在運行了直接多執行緒化的後,會發現執行時間比序列的長很多。很驚訝,難道是因為它的並行度太低?於是,我開始嘗試 複雜多執行緒化。
說白了,複雜多執行緒化就是也把歸併排序中的合併過程多執行緒化。很顯然,要重新設計 merge()
方法。
序列實現
新的
merge()
方法也是基於 分治策略 ,所以它上的並行也屬於 巢狀並行。詳細思路見程式碼(++說實話,這裡走了一些彎路++)。// 該類為 “序列” 實現 import java.util.Arrays; public class MergeImprove implements MergeSort { private static int count = 0; // TODO you own code and other people's code, because thinking isn't clear. // TODO One thinking: take a correct element into a right position util the end of recursion. // TODO for the one time call to merge(), saveArray's relevant position is assigned only one time. // TODO however, MergeSort will call merge() multiple times, so if don't sync every time, it's going to have a thread safe problem. /** * merge a1 and a2, a1 and a2 is already sorted array * * @param s1 the start of a1 * @param e1 the end of a1 * @param s2 the start of a2 * @param e2 the end of a2 * @param array wait for merging array */ private void merge(int s1, int e1, int s2, int e2, int[] array, int start, int[] saveArray) { // 1, make a1 is bigger than a2 if (e2 - s2 > e1 - s1) { int temp = s1; s1 = s2; s2 = temp; temp = e1; e1 = e2; e2 = temp; } // 0, solve little problem if (e1 - s1 == -1) return; // 2, get median of the a1 int medianPos = s1 + (e1 - s1) / 2; int median = array[medianPos]; // 3, get pos and redistribute int pos = getPos(median, s2, e2, array); int newMedianPos = start + (medianPos - s1) + (pos - s2); System.out.println(count + " pos :: " + newMedianPos); saveArray[newMedianPos] = array[medianPos]; merge(s1, medianPos - 1, s2, pos - 1, array, start, saveArray); merge(medianPos + 1, e1, pos, e2, array, newMedianPos + 1, saveArray); } private static int getPos(int num, int start, int end, int[] array) { while (end >= start) { int mid = (end - start) / 2 + start; if (array[mid] >= num) { end = mid - 1; } else { start = mid + 1; } } return start; } @Override public int[] mergeSortFunction(int[] array) { int[] saveArray = new int[array.length]; mergeSort(array, 0, array.length - 1, saveArray); return saveArray; } private void mergeSort(int[] array, int s, int e, int[] saveArray) { if (s != e) { int mid = (s + e) / 2; int[] t = new int[array.length]; mergeSort(array, s, mid, t); mergeSort(array, mid + 1, e, t); ++count; merge(s, mid, mid + 1, e, t, s, saveArray); } else saveArray[s] = array[s]; } public static void main(String[] args) { int[] array = {2, 9, 1, 8, 2, 10, 18, 21, 12, 5, 7, 4, 3}; MergeImprove mergeImprove = new MergeImprove(); int[] saveArray = mergeImprove.mergeSortFunction(array); System.out.println(Arrays.toString(saveArray)); } }
並行實現
import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MergeImprovePool implements MergeSort { private ExecutorService executorService; public MergeImprovePool() { this.executorService = Executors.newCachedThreadPool(); } private void merge(int s1, int e1, int s2, int e2, int[] array, int start, int[] saveArray) { // 1, make a1 is bigger than a2 if (e2 - s2 > e1 - s1) { int temp = s1; s1 = s2; s2 = temp; temp = e1; e1 = e2; e2 = temp; } // 0, solve little problem if (e1 - s1 == -1) return; // 2, get median of the a1 int medianPos = (s1 + e1) / 2; int median = array[medianPos]; // 3, get pos and redistribute int pos = getPos(median, s2, e2, array); int newMedianPos = start + (medianPos - s1) + (pos - s2); saveArray[newMedianPos] = array[medianPos]; int finalS = s1; int finalS1 = s2; // note1 AtomicBoolean flag = new AtomicBoolean(false); this.executorService.execute(() -> { merge(finalS, medianPos - 1, finalS1, pos - 1, array, start, saveArray); // note1 flag.getAndSet(true); }); merge(medianPos + 1, e1, pos, e2, array, newMedianPos + 1, saveArray); // note1 while (!flag.get()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * maybe should via class libraries * * @return position */ private static int getPos(int num, int start, int end, int[] array) { while (end >= start) { int mid = (end - start) / 2 + start; if (array[mid] >= num) { end = mid - 1; } else { start = mid + 1; } } return start; } @Override public int[] mergeSortFunction(int[] array) { int[] saveArray = new int[array.length]; mergeSort(array, 0, array.length - 1, saveArray); // note2 /* while (((ThreadPoolExecutor) this.executorService).getActiveCount() > 0) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } // */ this.executorService.shutdown(); return saveArray; } private void mergeSort(int[] array, int s, int e, int[] saveArray) { ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); AtomicBoolean needWait = new AtomicBoolean(true); if (s != e) { int mid = (s + e) / 2; int[] t = new int[array.length]; this.executorService.execute(() -> { try { mergeSort(array, s, mid, t); } finally { try { reentrantLock.lock(); needWait.set(false); condition.signal(); } finally { reentrantLock.unlock(); } } }); mergeSort(array, mid + 1, e, t); try { reentrantLock.lock(); if (needWait.get()) condition.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { reentrantLock.unlock(); } merge(s, mid, mid + 1, e, t, s, saveArray); } else saveArray[s] = array[s]; } public static void main(String[] args) { int[] array = {2, 9, 1, 8, 2, 10, 18, 21, 12, 5, 7, 4, 3}; MergeImprovePool mergeImprovePool = new MergeImprovePool(); int[] saveArray = mergeImprovePool.mergeSortFunction(array); System.out.println(Arrays.toString(saveArray)); } }
實現時,主要發現兩個問題(分別在程式碼中標為
note1
note2
):- 為什麼需要在
merge()
方法的最後新增同步? note2
處還需不需要?(不需要,若真正了明白上面問題這個問題就可以解決了)
針對
note1
的問題,理解過程如下:- 通過分析
merge()
方法,我們可以知道對它的一次呼叫只會對saveArray
陣列的相應位置進行一次賦值。由此,我們可能會想到多執行緒下也是安全的;確實,若只對它進行一次呼叫,這個判斷是對的; - 所以,一開始並沒有在
merge()
方法最後新增同步sync
。也就是等待所有完成後,才退出。然而,在測試時發現結果中有好多 0,並且會在一定概率下成功~~~ - 一開始我以為是 ++執行緒池的問題++,執行緒管理出現問題,執行緒池關閉過早???經除錯,發現所有的排序過程都執行了,這肯定不是執行緒池的鍋了;
- 經過一下午的除錯,終於發現少了
sync
操作,mmp!新增後(也就是note1
處),立馬跑通!! - 為什麼呢?MergeSort() will call merge() multiple times, so if don’t sync every time, it’s going to have a thread safe problem. 加上後,回溯 過程會一一呼叫
merge()
,而不會出現多個merge()
呼叫同時存在!
- 為什麼需要在
2. 比較
方法 | 數量 | 時間 |
---|---|---|
MergeSort | 10000 | 0.025 |
MergeSortThread | 10000 | 3.127 |
MergeSortPool | 10000 | 1.018 |
MergeImprove | 10000 | 0.127 |
MergeImprovePool | 10000 | 2.207 |
MergeSort | 100000 | 0.043 |
MergeSortThread | 100000 | 230.621 |
MergeSortPool | 100000 | 2.791 |
MergeImprove | 100000 | 4.321 |
MergeImprovePool | 100000 | outofmemory |
3. 總結
結果分析,請參考。
任重而道遠!!!