1. 程式人生 > >多處理器程式設計的藝術(二)-並行程式設計

多處理器程式設計的藝術(二)-並行程式設計

當處理器的效能的發展受到各方面因素的限制的時候,計算機產業開始用多處理器結構實現平行計算來提高計算的效率。我們使用多處理器共享儲存器的方式實現了多處理器程式設計,也就是多核程式設計。當然在這樣的系統結構下我們面臨著各種各樣的挑戰,例如如何協調各個處理器之間的資料排程以及現代計算機系統固有的非同步特徵等等。

在接下來的一系列文章中,我將會介紹一些基礎的原理以及並行程式的設計和併發程式的設計及實現,寫這篇文章是對近期學習課程的總結,方便自己溫故時習,感謝USTC付明老師的《多核平行計算》課程,瞭解更多推薦《The Art of Multiprocessor Programming, Revised Reprint》。

例項:大陣列元素的求和

思想:給出4個執行緒同時對陣列的1/4求和。

  • 注意:這是一個低階的演算法
  • 建立4個執行緒,每個執行緒負責部分的工作
  • 呼叫start(),啟動每個執行緒並行的執行
  • 使用join()方法等待每個執行緒執行結束
  • 將4個結果相加在一起
class SumThread extends java.lang.Thread {
  int lo, int hi, int[] arr; // arguments
  int ans = 0; // result
  SumThread(int[] a, int l, int h) { … }
  public void run(){ … } // override
}
int sum(int[] arr){// can be a static method
  int len = arr.length;
  int ans = 0;
  SumThread[] ts = new SumThread[4];
  for(int i=0; i < 4; i++){// do parallel computations
    ts[i] = new SumThread(arr,i*len/4,(i+1)*len/4);
    ts[i].start(); //必須使用start(),而不是run()方法
  }
  for(int i=0; i < 4; i++) { // combine results
ts[i].join(); // wait for helper to finish! ans += ts[i].ans; } return ans; }

join()方法使得呼叫者阻塞,直到receiver 完成了執行;
執行緒的啟動需要用start(),而不是run()

Fork-join framework

Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架。

我們再通過Fork和Join這兩個單詞來理解下Fork/Join框架,Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+…+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果。

這裡寫圖片描述

  • Fork-join 程式不需要關注執行緒間的共享記憶體

EX:求一個數組的最大值。

多執行緒方法:

public class MyThread implements Runnable {

    //下面兩個靜態成員變數需要通過用關鍵字synchronized修飾 的方法來訪問
    private volatile static int max = Integer.MIN_VALUE;            //初始最大值

    //下面為成員變數,每個執行緒物件私有
    private int[] nums;            //待查詢陣列
    private int low;                  //當前執行緒物件需要處理的陣列開始下標
    private int high;                 //當前執行緒物件需要處理的陣列結束下標

    //構造方法,傳入待查詢陣列、開始下標和結束下標
    public MyThread(int[] nums, int low, int high){
        this.nums = nums;
        this.low = low;
        this.high = high;
    }

    @Override
    public synchronized void run() {

        for(int i=low;i<=high;i++)
        {
            if(nums[i] > max)
            {
                MyThread.setMax(nums[i]);
            }
        }

    }

    public static synchronized int getMax() {
        return max;
    }

    public static synchronized void setMax(int max) {
        MyThread.max = max;
    }

}

Fork-join框架:

public class ForkThread extends RecursiveTask<Integer> {

    private static final int SEQUENTIAL_THRESHOLD = 5;//子陣列到分割最終大小

    private final int[] data;
    private final int start;
    private final int end;

public ForkThread(int[] data, int start, int end) {
  this.data = data;
  this.start = start;
  this.end = end;
}

public ForkThread(int[] data) {
  this(data, 0, data.length);
}

    @Override
    protected Integer compute() {
        final int length = end - start;
        if (length < SEQUENTIAL_THRESHOLD) {
            return computeDirectly();
        }
        final int split = length / 2;
        final ForkThread left = new ForkThread(data, start, start + split);
        left.fork();
        final ForkThread right = new ForkThread(data, start + split, end);
        return Math.max(right.compute(), left.join());
    }

private Integer computeDirectly() {
  /*System.out.println(Thread.currentThread() + "computing: " + start
                     + " to " + end);*/
  int max = Integer.MIN_VALUE;
  for (int i = start; i < end; i++) {
    if (data[i] > max) {
      max = data[i];
    }
  }
  return max;
}

    public static void main(String[] args) {
        // create a random data set
        final int[] data = new int[10000];
        final Random random = new Random();
        for (int i = 0; i < data.length; i++) {
            data[i] = (int)Math.floor((random.nextDouble()*100000.0)); 
            System.out.println(data[i]);
        }
        /*int data[] = new int[10000];
        for(int i=0;i<10000;i++)
            data[i] =(int) Math.random()*10000;*/

        // submit the task to the pool
        final ForkJoinPool pool = new ForkJoinPool(4);
        final ForkThread finder = new ForkThread(data);
//      System.out.println(pool.invoke(finder));
    }
}

Main方法執行緒:

public class SearchMax {

    /**
     * @param args
     */
    //初始化陣列
    public static int[] InitialArr(){
        final int[] data = new int[10000];
        final Random random = new Random();
        for (int i = 0; i < data.length; i++) {
            data[i] = (int)Math.floor((random.nextDouble()*100000.0)); 
        }
        return data;
    }
    //分治法
    public static long DivideMax(int arr[]){

        int size = arr.length;
        long startTime=System.nanoTime();
        Thread t1 = new Thread(new MyThread(arr,0,size/2));
        Thread t2 = new Thread(new MyThread(arr,size/2+1,size-1));

        t1.start();
        t2.start();

        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime=System.nanoTime();
        System.out.println( "分治法: "+MyThread.getMax());

        return (endTime-startTime);
    }


    //順序查詢
    public static long OrderMax(int arr[]){

        int size = arr.length;
        int ret = 0;
        long startTime=System.nanoTime();
        for(int i=0;i<size;i++){
            if(arr[i]>ret)
                ret = arr[i];
        }
        long endTime=System.nanoTime();
        System.out.println( "順序查詢: "+ret);
        return (endTime-startTime);
    }

    //Fork-Join框架
    public static long ForkMax(int arr[]){

        int ret = 0;
        long startTime=System.nanoTime();
        // submit the task to the pool
        final ForkJoinPool pool = new ForkJoinPool(4);
        final ForkThread finder = new ForkThread(arr);

        ret = pool.invoke(finder);
        long endTime=System.nanoTime();
        System.out.println( "Fork_join: "+ret);
        return (endTime-startTime);
    }

    public static void main(String[] args) {

        int data[] = InitialArr();
        System.out.println("分治法花費時間 "+DivideMax(data)+" ns");
        System.out.println("順序查詢花費時間 "+OrderMax(data)+" ns");
        System.out.println("Fork_Join花費時間 "+ForkMax(data)+" ns");

    }

}

執行結果:

這裡寫圖片描述

設計一個Fork/Join框架的步驟:

第一步:分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割的子任務足夠小;

第二步:執行任務併合並結果,分割的子任務分別放在雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡分別獲取任務執行,子任務執行完的結果都統一放在一個佇列裡,啟動一個執行緒從佇列裡拿資料,然後合併這些資料。

Fork/Join框架使用兩個類來完成以上兩件事:

(1)ForkJoinTask:我們要使用Fork Join框架,必須首先建立一個Fork Join任務,它提供在任務中執行fork() 和join()操作的機制,通常情況下我們不需要繼承ForkJoinTask類,而只需要繼承它的子類,Fork/Join框架提供以下兩個子類:

  • RecursiveAction:用於沒有返回結果的任務。

  • RecursiveTask:用於有返回結果的任務。

(2)ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會新增到當前工作執行緒所維護的雙端佇列中,進入佇列的頭部。當一個工作執行緒的佇列暫時沒有任務時,它會隨機從其他工作執行緒的佇列的尾部獲取一個任務。

Fork/Join框架的異常處理:

Fork JoinTask在執行的時候可能會丟擲異常,但是我們沒辦法在主執行緒裡直接捕獲異常,所以Fork JoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過Fork JoinTask的getException方法獲取異常,使用如下程式碼:

if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法返回Throwable物件,如果任務被取消了則返回CancellationException,如果任務沒有完成或者沒有丟擲異常則返回null。

EX:大陣列求和

class SumArray extends RecursiveTask<Integer> {
  int lo; int hi; int[] arr; // arguments
  SumArray(int[] a, int l, int h) { … }
  protected Integer compute(){// return answer
    if(hi – lo < SEQUENTIAL_CUTOFF) {
      int ans = 0;
      for(int i=lo; i < hi; i++)
        ans += arr[i];
      return ans;
    } else {
      SumArray left = new SumArray(arr,lo,(hi+lo)/2);
      SumArray right= new SumArray(arr,(hi+lo)/2,hi);
      left.fork();
      int rightAns = right.compute();
      int leftAns  = left.join(); 
      return leftAns + rightAns;
    }
  }
}
static final ForkJoinPool fjPool = new ForkJoinPool();
int sum(int[] arr){
  return fjPool.invoke(new SumArray(arr,0,arr.length));
}

演算法設計