1. 程式人生 > >歸併排序多執行緒化

歸併排序多執行緒化

Multiple thread for the MergeSort

0. 來源

在閱讀 Introduction to Algorithm 中多執行緒一章後,決定自己實現 ++歸併排序的多執行緒化++ 。本以為,它會很簡單,而且併發後效果會比較好。然而,實現之路困難重重,並且效果很差。

故,把整個過程記錄在此。。。

1. 過程

1.1 從“序列”的歸併排序開始

  • 比較簡單直接上程式碼。

    import java.util.Arrays;
    
    public class MergeSortSingle implements MergeSort {
    
        @Override
        public int[] mergeSortFunction(int[] array) {
            int[] myArray = Arrays.copyOf(array, array.length);
            mergeSort(myArray, 0, myArray.length - 1);
            return myArray;
        }
    
        private void mergeSort(int[] array, int s, int e) {
            if (s != e) {
                int mid = (s + e) / 2;
                mergeSort(array, s, mid);
                mergeSort(array, mid + 1, e);
                merge(array, s, mid, e);
            }
        }
    
        private void merge(int[] array, int s, int mid, int e) {
            int[] a1 = Arrays.copyOfRange(array, s, mid + 1);
            int[] a2 = Arrays.copyOfRange(array, mid + 1, e + 1);
            int j = 0;
            int k = 0;
            for (int i = s; i <= e; i++) {
                if (j >= a1.length) {
                    array[i] = a2[k];
                    ++k;
                    continue;
                }
                if (k >= a2.length) {
                    array[i] = a1[j];
                    ++j;
                    continue;
                }
                if (a1[j] < a2[k]) {
                    array[i] = a1[j];
                    ++j;
                } else {
                    array[i] = a2[k];
                    ++k;
                }
            }
        }
    
    }
    

1.2 直接多執行緒化

  • 可以看到 歸併排序 是一種典型的 分治演算法 ,它分為 分解 解決 合併 三個部分。我們相對歸併排序多執行緒化,可一下想到從++分解之處++下手,也就是關於的 mergeSort() 的兩處呼叫可以並行執行。

  • 但是,需要注意的是:++考慮併發的同時,必須考慮同步。++ 比如,這裡的 merge() 也就是合併過程,必須要等到前面兩處分解結束後才能執行。

  • OK,現在有了要併發的目標,實現上最困難之處有以下兩點:

    • 如何實現同步,也就是等待兩處 mergeSort() 結束後才執行 merge()
    • 如何管理執行緒,使用 執行緒池
      or 無需管理

    從程式碼中可以看出,這裡使用了 ExecutorService 提供的執行緒池來 管理執行緒。並且利用了 ReentrantLock & Condition 鎖實現 同步

  • 在使用它們的時候遇到下面兩個問題:

    • 執行緒池的大小(若使用固定大小的執行緒池往往發生死鎖,除非你設定的足夠大。因為,此巢狀並行到回溯時才能一一的釋放執行緒,應該畫個圖)
    • 鎖所在的位置(此鎖的作用是為了等待。如果把鎖作為類的屬性,它會把所有的在該鎖下等待的都喚醒。然而,它只應該喚醒的對應執行緒內的。所以,把鎖作為類屬性會擴大它的範圍,導致問題)

      import java.util.Arrays;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.atomic.AtomicBoolean;
      import java.util.concurrent.locks.Condition;
      import java.util.concurrent.locks.ReentrantLock;
      
      public class MergeSortPool implements MergeSort {
      
          private ExecutorService executorService;
      
          // 鎖必須在方法內
      //    private ReentrantLock reentrantLock = new ReentrantLock();
      //    private Condition condition = reentrantLock.newCondition();
      
          public MergeSortPool() {
      //        this.executorService = Executors.newFixedThreadPool(threadNum);
              this.executorService = Executors.newCachedThreadPool();
          }
      
          @Override
          public int[] mergeSortFunction(int[] array) {
              int[] myArray = Arrays.copyOf(array, array.length);
              mergeSort(myArray, 0, myArray.length - 1);
              this.executorService.shutdown();
              return myArray;
          }
      
          // TODO 若執行緒池設定的太小,則會導致死鎖。因為,會出現把所有的執行緒
          private void mergeSort(int[] array, int s, int e) {
              ReentrantLock reentrantLock = new ReentrantLock();
              Condition condition = reentrantLock.newCondition();
              AtomicBoolean needWait = new AtomicBoolean(true);
              if (s != e) {
                  int mid = (s + e) / 2;
                  this.executorService.execute(() -> {
                      try {
                          mergeSort(array, s, mid);
                      } finally {
                          try {
                              reentrantLock.lock();
                              needWait.set(false);
                              condition.signal();
                          } finally {
                              reentrantLock.unlock();
                          }
                      }
                  });
                  mergeSort(array, mid + 1, e);
                  try {
                      reentrantLock.lock();
                      if (needWait.get())
                          condition.await();
                  } catch (InterruptedException e1) {
                      e1.printStackTrace();
                  } finally {
                      reentrantLock.unlock();
                  }
                  merge(array, s, mid, e);
              }
          }
      
          private void merge(int[] array, int s, int mid, int e) {
              int[] a1 = Arrays.copyOfRange(array, s, mid + 1);
              int[] a2 = Arrays.copyOfRange(array, mid + 1, e + 1);
              int j = 0;
              int k = 0;
              for (int i = s; i <= e; i++) {
                  if (j >= a1.length) {
                      array[i] = a2[k];
                      ++k;
                      continue;
                  }
                  if (k >= a2.length) {
                      array[i] = a1[j];
                      ++j;
                      continue;
                  }
                  if (a1[j] < a2[k]) {
                      array[i] = a1[j];
                      ++j;
                  } else {
                      array[i] = a2[k];
                      ++k;
                  }
              }
          }
      }
      

1.3 複雜多執行緒化

在運行了直接多執行緒化的後,會發現執行時間比序列的長很多。很驚訝,難道是因為它的並行度太低?於是,我開始嘗試 複雜多執行緒化

說白了,複雜多執行緒化就是也把歸併排序中的合併過程多執行緒化。很顯然,要重新設計 merge() 方法。

  • 序列實現

    新的 merge() 方法也是基於 分治策略 ,所以它上的並行也屬於 巢狀並行。詳細思路見程式碼(++說實話,這裡走了一些彎路++)。

    // 該類為 “序列” 實現
    import java.util.Arrays;
    
    public class MergeImprove implements MergeSort {
    
        private static int count = 0;
    
        // TODO you own code and other people's code, because thinking isn't clear.
        // TODO One thinking: take a correct element into a right position util the end of recursion.
    
        // TODO for the one time call to merge(), saveArray's relevant position is assigned only one time.
        // TODO however, MergeSort will call merge() multiple times, so if don't sync every time, it's going to have a thread safe problem.
    
        /**
         * merge a1 and a2, a1 and a2 is already sorted array
         *
         * @param s1    the start of a1
         * @param e1    the end of a1
         * @param s2    the start of a2
         * @param e2    the end of a2
         * @param array wait for merging array
         */
        private void merge(int s1, int e1, int s2, int e2, int[] array, int start, int[] saveArray) {
            // 1, make a1 is bigger than a2
            if (e2 - s2 > e1 - s1) {
                int temp = s1;
                s1 = s2;
                s2 = temp;
                temp = e1;
                e1 = e2;
                e2 = temp;
            }
            // 0, solve little problem
            if (e1 - s1 == -1)
                return;
            // 2, get median of the a1
            int medianPos = s1 + (e1 - s1) / 2;
            int median = array[medianPos];
            // 3, get pos and redistribute
            int pos = getPos(median, s2, e2, array);
    
            int newMedianPos = start + (medianPos - s1) + (pos - s2);
            System.out.println(count + " pos :: " + newMedianPos);
            saveArray[newMedianPos] = array[medianPos];
            merge(s1, medianPos - 1, s2, pos - 1, array, start, saveArray);
            merge(medianPos + 1, e1, pos, e2, array, newMedianPos + 1, saveArray);
        }
    
        private static int getPos(int num, int start, int end, int[] array) {
            while (end >= start) {
                int mid = (end - start) / 2 + start;
                if (array[mid] >= num) {
                    end = mid - 1;
                } else {
                    start = mid + 1;
                }
            }
            return start;
        }
    
        @Override
        public int[] mergeSortFunction(int[] array) {
            int[] saveArray = new int[array.length];
            mergeSort(array, 0, array.length - 1, saveArray);
            return saveArray;
        }
    
        private void mergeSort(int[] array, int s, int e, int[] saveArray) {
            if (s != e) {
                int mid = (s + e) / 2;
                int[] t = new int[array.length];
                mergeSort(array, s, mid, t);
                mergeSort(array, mid + 1, e, t);
                ++count;
                merge(s, mid, mid + 1, e, t, s, saveArray);
            } else
                saveArray[s] = array[s];
        }
    
        public static void main(String[] args) {
            int[] array = {2, 9, 1, 8, 2, 10, 18, 21, 12, 5, 7, 4, 3};
            MergeImprove mergeImprove = new MergeImprove();
            int[] saveArray = mergeImprove.mergeSortFunction(array);
            System.out.println(Arrays.toString(saveArray));
        }
    }
    
  • 並行實現

    import java.util.Arrays;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class MergeImprovePool implements MergeSort {
    
        private ExecutorService executorService;
    
        public MergeImprovePool() {
            this.executorService = Executors.newCachedThreadPool();
        }
    
        private void merge(int s1, int e1, int s2, int e2, int[] array, int start, int[] saveArray) {
            // 1, make a1 is bigger than a2
            if (e2 - s2 > e1 - s1) {
                int temp = s1;
                s1 = s2;
                s2 = temp;
                temp = e1;
                e1 = e2;
                e2 = temp;
            }
            // 0, solve little problem
            if (e1 - s1 == -1)
                return;
            // 2, get median of the a1
            int medianPos = (s1 + e1) / 2;
            int median = array[medianPos];
            // 3, get pos and redistribute
            int pos = getPos(median, s2, e2, array);
            int newMedianPos = start + (medianPos - s1) + (pos - s2);
            saveArray[newMedianPos] = array[medianPos];
            int finalS = s1;
            int finalS1 = s2;
            // note1
            AtomicBoolean flag = new AtomicBoolean(false);
            this.executorService.execute(() -> {
                merge(finalS, medianPos - 1, finalS1, pos - 1, array, start, saveArray);
                // note1
                flag.getAndSet(true);
            });
            merge(medianPos + 1, e1, pos, e2, array, newMedianPos + 1, saveArray);
            // note1
            while (!flag.get()) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * maybe should via class libraries
         *
         * @return position
         */
        private static int getPos(int num, int start, int end, int[] array) {
            while (end >= start) {
                int mid = (end - start) / 2 + start;
                if (array[mid] >= num) {
                    end = mid - 1;
                } else {
                    start = mid + 1;
                }
            }
            return start;
        }
    
        @Override
        public int[] mergeSortFunction(int[] array) {
            int[] saveArray = new int[array.length];
            mergeSort(array, 0, array.length - 1, saveArray);
            // note2
            /*
            while (((ThreadPoolExecutor) this.executorService).getActiveCount() > 0) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // */
            this.executorService.shutdown();
    
            return saveArray;
        }
    
        private void mergeSort(int[] array, int s, int e, int[] saveArray) {
            ReentrantLock reentrantLock = new ReentrantLock();
            Condition condition = reentrantLock.newCondition();
            AtomicBoolean needWait = new AtomicBoolean(true);
            if (s != e) {
                int mid = (s + e) / 2;
                int[] t = new int[array.length];
                this.executorService.execute(() -> {
                    try {
                        mergeSort(array, s, mid, t);
                    } finally {
                        try {
                            reentrantLock.lock();
                            needWait.set(false);
                            condition.signal();
                        } finally {
                            reentrantLock.unlock();
                        }
                    }
                });
                mergeSort(array, mid + 1, e, t);
                try {
                    reentrantLock.lock();
                    if (needWait.get())
                        condition.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
                merge(s, mid, mid + 1, e, t, s, saveArray);
            } else
                saveArray[s] = array[s];
        }
    
        public static void main(String[] args) {
            int[] array = {2, 9, 1, 8, 2, 10, 18, 21, 12, 5, 7, 4, 3};
            MergeImprovePool mergeImprovePool = new MergeImprovePool();
            int[] saveArray = mergeImprovePool.mergeSortFunction(array);
            System.out.println(Arrays.toString(saveArray));
        }
    }
    

    實現時,主要發現兩個問題(分別在程式碼中標為 note1 note2):

    • 為什麼需要在 merge() 方法的最後新增同步?
    • note2 處還需不需要?(不需要,若真正了明白上面問題這個問題就可以解決了)

    針對 note1 的問題,理解過程如下:

    1. 通過分析 merge() 方法,我們可以知道對它的一次呼叫只會對 saveArray 陣列的相應位置進行一次賦值。由此,我們可能會想到多執行緒下也是安全的;確實,若只對它進行一次呼叫,這個判斷是對的;
    2. 所以,一開始並沒有在 merge() 方法最後新增同步 sync。也就是等待所有完成後,才退出。然而,在測試時發現結果中有好多 0,並且會在一定概率下成功~~~
    3. 一開始我以為是 ++執行緒池的問題++,執行緒管理出現問題,執行緒池關閉過早???經除錯,發現所有的排序過程都執行了,這肯定不是執行緒池的鍋了;
    4. 經過一下午的除錯,終於發現少了 sync 操作,mmp!新增後(也就是 note1 處),立馬跑通!!
    5. 為什麼呢?MergeSort() will call merge() multiple times, so if don’t sync every time, it’s going to have a thread safe problem. 加上後,回溯 過程會一一呼叫 merge(),而不會出現多個 merge() 呼叫同時存在!

2. 比較

方法 數量 時間
MergeSort 10000 0.025
MergeSortThread 10000 3.127
MergeSortPool 10000 1.018
MergeImprove 10000 0.127
MergeImprovePool 10000 2.207
MergeSort 100000 0.043
MergeSortThread 100000 230.621
MergeSortPool 100000 2.791
MergeImprove 100000 4.321
MergeImprovePool 100000 outofmemory

3. 總結

結果分析,請參考

任重而道遠!!!