1. 程式人生 > >java自定義模擬執行緒池

java自定義模擬執行緒池

java 執行緒池API提供了newCachedThreadPool() newFixedThreadPool(int) 等方法

1 public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5 }
1   public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5  }

包括其他幾種不同型別的執行緒池,其實都是通過 ThreadPoolExecutor這個核心類來建立的,如果我們要自定義執行緒池,那麼也是通過這個類來實現的。

該類有四個構造方法,檢視原始碼可以看到,頭三個構造方法,其實都是呼叫的第四個構造方法,所以我們就解釋一下第四個構造方法的引數含義。

複製程式碼

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

複製程式碼

corePoolSize:核心執行緒池的大小,線上程池被建立之後,其實裡面是沒有執行緒的。(當然,呼叫prestartAllCoreThreads()或者prestartCoreThread()方法會預建立執行緒,而不用等著任務的到來)。當有任務進來的時候,才會建立執行緒。當執行緒池中的執行緒數量達到corePoolSize之後,就把任務放到 快取隊列當中。(就是 workQueue)。

maximumPoolSize:最大執行緒數量是多少。它標誌著這個執行緒池的最大執行緒數量。如果沒有最大數量,當建立的執行緒數量達到了 某個極限值,到最後記憶體肯定就爆掉了。
keepAliveTime:當執行緒沒有任務時,最多保持的時間,超過這個時間就被終止了。預設情況下,只有 執行緒池中執行緒數量 大於 corePoolSize時,keepAliveTime值才會起作用。也就說說,只有在執行緒池執行緒數量超出corePoolSize了。我們才會把超時的空閒執行緒給停止掉。否則就保持執行緒池中有 corePoolSize 個執行緒就可以了。


Unit:引數keepAliveTime的時間單位,就是 TimeUnit類當中的幾個屬性。
如下圖:

workQueue:用來儲存待執行任務的佇列,不同的執行緒池它的佇列實現方式不同(因為這關係到排隊策略的問題)比如有以下幾種
        ArrayBlockingQueue:基於陣列的佇列,建立時需要指定大小。
        LinkedBlockingQueue:基於連結串列的佇列,如果沒有指定大小,則預設值是 Integer.MAX_VALUE。(newFixedThreadPool和newSingleThreadExecutor使用的就是這種佇列)。
       SynchronousQueue:這種佇列比較特殊,因為不排隊就直接建立新執行緒把任務提交了。(newCachedThreadPool使用的就是這種佇列)。

threadFactory:執行緒工廠,用來建立執行緒。

Handler:拒絕執行任務時的策略,一般來講有以下四種策略,
       (1) ThreadPoolExecutor.AbortPolicy 丟棄任務,並丟擲 RejectedExecutionException 異常。
       (2) ThreadPoolExecutor.CallerRunsPolicy:該任務被執行緒池拒絕,由呼叫 execute方法的執行緒執行該任務。
       (3) ThreadPoolExecutor.DiscardOldestPolicy : 拋棄佇列最前面的任務,然後重新嘗試執行任務。
       (4) ThreadPoolExecutor.DiscardPolicy,丟棄任務,不過也不丟擲異常。

demo

複製程式碼

 1 class CustomTask implements Runnable{
 2     private  int id;
 3     public CustomTask(int id) {
 4         this.id = id;
 5     }
 6 
 7     @Override
 8     public void run() {
 9         // TODO Auto-generated method stub
10         System.out.println("#" + id + "   threadId=" + Thread.currentThread().getName() );
11         try {
12             TimeUnit.MILLISECONDS.sleep(100);
13         }catch(InterruptedException e){
14             e.printStackTrace();
15         }
16     }
17     
18 }
19 
20 public class CustomThreadPool {
21     public static void main(String[] args) {
22         // TODO Auto-generated method stub
23             BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
24             ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 60, TimeUnit.MICROSECONDS, queue);
25             
26             for (int i=0; i<7; i++){
27                 Runnable task = new CustomTask(i);
28                 pool.execute(task);
29             }
30             
31             pool.shutdown();
32     }
33 
34 }

複製程式碼


輸出結果: 

從這個例子可以看出,雖然我們有7個任務,但是實際上,只有三個執行緒在執行。
那麼當我們提交任務給執行緒池之後,它的處理策略是什麼呢?

(1),如果當前執行緒池執行緒數目小於 corePoolSize(核心池還沒滿呢),那麼就建立一個新執行緒去處理任務。
(2),如果核心池已經滿了,來了一個新的任務後,會嘗試將其新增到任務佇列中,如果成功,則等待空閒執行緒將其從佇列中取出並且執行,如果佇列已經滿了,則繼續下一步。
(3),此時,如果執行緒池執行緒數量 小於 maximumPoolSize,則建立一個新執行緒執行任務,否則,那就說明執行緒池到了最大飽和能力了,沒辦法再處理了,此時就按照拒絕策略來處理。(就是建構函式當中的Handler物件)。

(4),如果執行緒池的執行緒數量大於corePoolSize,則當某個執行緒的空閒時間超過了keepAliveTime,那麼這個執行緒就要被銷燬了,直到執行緒池中執行緒數量不大於corePoolSize為止。

舉個通俗易懂的例子,公司要設立一個專案組來處理某些任務,hr部門給的人員編制是10個人(corePoolSize)。同時給他們專門設定了一間有15個座位(maximumPoolSize)的辦公室。最開始的時候來了一個任務,就招聘一個人。就這樣,一個一個的招聘,招滿了十個人,不斷有新的任務安排給這個專案組,每個人也在不停的接任務幹活。不過後來任務越來越多,十個人無法處理完了。其他的任務就只能在走廊外面排隊了。後來任務越來越多,走廊的排隊隊伍也擠不下。然後只好找找一些臨時工來幫助完成任務。因為辦公室只有15個座位,所以它們最多也就只能找5個臨時工。可是任務依舊越來越多,根本處理不完,那沒辦法,這個專案組只好拒絕再接新任務。(拒絕的方式就是 Handler),最後任務漸漸的少了,大家都比較清閒了。所以就決定看大家表現,誰表現不好,誰就被清理出這個辦公室(空閒時間超過 keepAliveTime),直到 辦公室只剩下10個人(corePoolSize),維持固定的人員編制為止。


關於執行緒池,ThreadPoolExecutor還提供了一些需要注意的方法:

(1) shutdown(),平滑的關閉執行緒池。(如果還有未執行完的任務,就等待它們執行完)。
(2) shutdownNow()。簡單粗暴的關閉執行緒池。(沒有執行完的任務也直接關閉)。
(3) setCorePoolSize()。設定/更改核心池的大小。
(4) setMaximumPoolSize(),設定/更改執行緒池中最大執行緒的數量限制。