1. 程式人生 > >如何充分利用多核CPU,計算很大的List中所有整數的和

如何充分利用多核CPU,計算很大的List中所有整數的和

引用 前幾天在網上看到一個淘寶的面試題:有一個很大的整數list,需要求這個list中所有整數的和,寫一個可以充分利用多核CPU的程式碼,來計算結果。
一:分析題目
從題中可以看到“很大的List”以及“充分利用多核CPU”,這就已經充分告訴我們要採用多執行緒(任務)進行編寫。具體怎麼做呢?大概的思路就是分割List,每一小塊的List採用一個執行緒(任務)進行計算其和,最後等待所有的執行緒(任務)都執行完後就可得到這個“很大的List”中所有整數的和。 
二:具體分析和技術方案
既然我們已經決定採用多執行緒(任務),並且還要分割List,每一小塊的List採用一個執行緒(任務)進行計算其和,那麼我們必須要等待所有的執行緒(任務)完成之後才能得到正確的結果,那麼怎麼才能保證“等待所有的執行緒(任務)完成之後輸出結果呢”?這就要靠java.util.concurrent包中的CyclicBarrier類了。它是一個同步輔助類,它允許一組執行緒(任務)互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒(任務)的程式中,這些執行緒(任務)必須不時地互相等待,此時 CyclicBarrier 很有用。簡單的概括其適應場景就是:當一組執行緒(任務)併發的執行一件工作的時候,必須等待所有的執行緒(任務)都完成時才能進行下一個步驟。具體技術方案步驟如下: 

  • 分割List,根據採用的執行緒(任務)數平均分配,即list.size()/threadCounts。
  • 定義一個記錄“很大List”中所有整數和的變數sum,採用一個執行緒(任務)處理一個分割後的子List,計運算元List中所有整數和(subSum),然後把和(subSum)累加到sum上。
  • 等待所有執行緒(任務)完成後輸出總和(sum)的值。

示意圖如下: 

三:詳細編碼實現
程式碼中有很詳細的註釋,這裡就不解釋了。 
Java程式碼  收藏程式碼
  1. /** 
  2.  * 計算List中所有整數的和<br> 
  3.  * 採用多執行緒,分割List計算 
  4.  * @author 飛雪無情 
  5.  * @since 2010-7-12
     
  6.  */  
  7. public class CountListIntegerSum {  
  8.     private long sum;//存放整數的和  
  9.     private CyclicBarrier barrier;//障柵集合點(同步器)  
  10.     private List<Integer> list;//整數集合List  
  11.     private int threadCounts;//使用的執行緒數  
  12.     public CountListIntegerSum(List<Integer> list,int threadCounts) {  
  13.         this.list=list;  
  14.         this.threadCounts=threadCounts;  
  15.     }  
  16.     /** 
  17.      * 獲取List中所有整數的和 
  18.      * @return 
  19.      */  
  20.     public long getIntegerSum(){  
  21.         ExecutorService exec=Executors.newFixedThreadPool(threadCounts);  
  22.         int len=list.size()/threadCounts;//平均分割List  
  23.         //List中的數量沒有執行緒數多(很少存在)  
  24.         if(len==0){  
  25.             threadCounts=list.size();//採用一個執行緒處理List中的一個元素  
  26.             len=list.size()/threadCounts;//重新平均分割List  
  27.         }  
  28.         barrier=new CyclicBarrier(threadCounts+1);  
  29.         for(int i=0;i<threadCounts;i++){  
  30.             //建立執行緒任務  
  31.             if(i==threadCounts-1){//最後一個執行緒承擔剩下的所有元素的計算  
  32.                 exec.execute(new SubIntegerSumTask(list.subList(i*len,list.size())));  
  33.             }else{  
  34.                 exec.execute(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1))));  
  35.             }  
  36.         }  
  37.         try {  
  38.             barrier.await();//關鍵,使該執行緒在障柵處等待,直到所有的執行緒都到達障柵處  
  39.         } catch (InterruptedException e) {  
  40.             System.out.println(Thread.currentThread().getName()+":Interrupted");  
  41.         } catch (BrokenBarrierException e) {  
  42.             System.out.println(Thread.currentThread().getName()+":BrokenBarrier");  
  43.         }  
  44.         exec.shutdown();  
  45.         return sum;  
  46.     }  
  47.     /** 
  48.      * 分割計算List整數和的執行緒任務 
  49.      * @author lishuai 
  50.      * 
  51.      */  
  52.     public class SubIntegerSumTask implements Runnable{  
  53.         private List<Integer> subList;  
  54.         public SubIntegerSumTask(List<Integer> subList) {  
  55.             this.subList=subList;  
  56.         }  
  57.         public void run() {  
  58.             long subSum=0L;  
  59.             for (Integer i : subList) {  
  60.                 subSum += i;  
  61.             }    
  62.             synchronized(CountListIntegerSum.this){//在CountListIntegerSum物件上同步  
  63.                 sum+=subSum;  
  64.             }  
  65.             try {  
  66.                 barrier.await();//關鍵,使該執行緒在障柵處等待,直到所有的執行緒都到達障柵處  
  67.             } catch (InterruptedException e) {  
  68.                 System.out.println(Thread.currentThread().getName()+":Interrupted");  
  69.             } catch (BrokenBarrierException e) {  
  70.                 System.out.println(Thread.currentThread().getName()+":BrokenBarrier");  
  71.             }  
  72.             System.out.println("分配給執行緒:"+Thread.currentThread().getName()+"那一部分List的整數和為:\tSubSum:"+subSum);  
  73.         }  
  74.     }  
  75. }  

有人可能對barrier=new CyclicBarrier(threadCounts+1);//建立的執行緒數和主執行緒main有點不解,不是採用的執行緒(任務)數是threadCounts個嗎?怎麼為CyclicBarrier設定的給定數量的執行緒參與者比我們要採用的執行緒數多一個呢?答案就是這個多出來的一個用於控制main主執行緒的,主執行緒也要等待,它要等待其他所有的執行緒完成才能輸出sum值,這樣才能保證sum值的正確性,如果main不等待的話,那麼結果將是不可預料的。 
Java程式碼  收藏程式碼
  1. /** 
  2.  * 計算List中所有整數的和測試類 
  3.  * @author 飛雪無情 
  4.  * @since 2010-7-12 
  5.  */  
  6. public class CountListIntegerSumMain {  
  7.     /** 
  8.      * @param args 
  9.      */  
  10.     public static void main(String[] args) {  
  11.         List<Integer> list = new ArrayList<Integer>();  
  12.         int threadCounts = 10;//採用的執行緒數  
  13.         //生成的List資料  
  14.         for (int i = 1; i <= 1000000; i++) {  
  15.             list.add(i);  
  16.         }  
  17.         CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts);  
  18.         long sum=countListIntegerSum.getIntegerSum();  
  19.         System.out.println("List中所有整數的和為:"+sum);  
  20.     }  
  21. }  

四:總結
本文主要通過一個淘寶的面試題為引子,介紹了併發的一點小知識,主要是介紹通過CyclicBarrier同步輔助器輔助多個併發任務共同完成一件工作。Java SE5的java.util.concurrent引入了大量的設計來解決併發問題,使用它們有助於我們編寫更加簡單而健壯的併發程式。 

附mathfox提到的ExecutorService.invokeAll()方法的實現
這個不用自己控制等待,invokeAll執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表。sdh5724也說用了同步,效能不好。這個去掉了同步,根據返回結果的 Future 列表相加就得到總和了。 Java程式碼  收藏程式碼
  1. /** 
  2.  * 使用ExecutorService的invokeAll方法計算 
  3.  * @author 飛雪無情 
  4.  * 
  5.  */  
  6. public class CountSumWithCallable {  
  7.     /** 
  8.      * @param args 
  9.      * @throws InterruptedException  
  10.      * @throws ExecutionException  
  11.      */  
  12.     public static void main(String[] args) throws InterruptedException, ExecutionException {  
  13.         int threadCounts =19;//使用的執行緒數  
  14.         long sum=0;  
  15.         ExecutorService exec=Executors.newFixedThreadPool(threadCounts);  
  16.         List<Callable<Long>> callList=new ArrayList<Callable<Long>>();  
  17.         //生成很大的List  
  18.         List<Integer> list = new ArrayList<Integer>();  
  19.         for (int i = 0; i <= 1000000; i++) {  
  20.             list.add(i);  
  21.         }  
  22.         int len=list.size()/threadCounts;//平均分割List  
  23.         //List中的數量沒有執行緒數多(很少存在)  
  24.         if(len==0){  
  25.             threadCounts=list.size();//採用一個執行緒處理List中的一個元素  
  26.             len=list.size()/threadCounts;//重新平均分割List  
  27.         }  
  28.         for(int i=0;i<threadCounts;i++){  
  29.             final List<Integer> subList;  
  30.             if(i==threadCounts-1){  
  31.                 subList=list.subList(i*len,list.size());  
  32.             }else{  
  33.                 subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1));  
  34.             }  
  35.             //採用匿名內部類實現  
  36.             callList.add(new Callable<Long>(){  
  37.                 public Long call() throws Exception {  
  38.                     long subSum=0L;  
  39.                     for(Integer i:subList){  
  40.                         subSum+=i;  
  41.                     }  
  42.                     System.out.println("分配給執行緒:"+Thread.currentThread().getName()+"那一部分List的整數和為:\tSubSum:"+subSum);  
  43.                     return subSum;  
  44.                 }  
  45.             });  
  46.         }  
  47.         List<Future<Long>> futureList=exec.invokeAll(callList);  
  48.         for(Future<Long> future:futureList){  
  49.             sum+=future.get();  
  50.         }  
  51.         exec.shutdown();  
  52.         System.out.println(sum);  
  53.     }  
  54. }  

我一直相信:討論是解決問題、提高水平的最佳方式!