1. 程式人生 > >線程池的使用及ThreadPoolExecutor的execute和addWorker源碼分析

線程池的使用及ThreadPoolExecutor的execute和addWorker源碼分析

單位 bool 集合 handler 意思 size 執行順序 targe execute

說明:本作者是文章的原創作者,轉載請註明出處:本文地址:http://www.cnblogs.com/qm-article/p/7821602.html

一、線程池的介紹

在開發中,頻繁的創建和銷毀一個線程,是很耗資源的,為此找出了一個可以循環利用已經存在的線程來達到自己的目的,線程池顧名思義,也就是線程池的集合,通過線程池執行的線程任務,可以很有效的去規劃線程的使用。
在java中大致有這幾種線程池
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。,可以作一個定時器使用。
newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過需要的線程數量,可靈活回收空閑線程,若無可回收,則新建線程。
newSingleThreadExecutor

創建一個單線程化的線程池, 它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行,可以控制線程的執行順序
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待,當創建的線程池數量為1的時候。也類似於單線程化的線程池,當為1的時候,也可控制線程的執行順序

二、線程池的使用

1、newScheduledThreadPool

 1    /**
 2      * 測試newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
 3      * 一般可做定時器使用
4 */ 5 public static void test_1(){ 6 //參數是線程的數量 7 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); 8 /** 9 * 第二個參數,是首次執行該線程的延遲時間,之後失效 10 * 第三個參數是,首次執行完之後,再過該段時間再次執行該線程,具有周期性 11 */ 12 scheduledExecutorService.scheduleAtFixedRate(new
Runnable() { 13 14 @Override 15 public void run() { 16 System.out.println(new Date().getSeconds()); 17 18 } 19 }, 10, 3, TimeUnit.SECONDS); 20 21 }

2、newCachedThreadPool

 1     /**
 2      * newCachedThreadPool創建一個可緩存線程池,
 3      * 如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
 4      */
 5     public static void test_2(){
 6         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
 7         for (int i = 0; i < 10; i++) {  
 8                final int index = i;  
 9                try {  
10                 Thread.sleep(index * 1000);  
11                } catch (InterruptedException e) {  
12                 e.printStackTrace();  
13                }  
14             cachedThreadPool.execute(new Runnable() {
15                 
16                 @Override
17                 public void run() {
18                     // TODO Auto-generated method stub
19                     System.out.println(index+":"+new Date().getSeconds());
20                 }
21             });
22         }
23     }

3、newSingleThreadExecutor

 1     /**
 2      * newSingleThreadExecutor 創建一個單線程化的線程池,
 3      * 它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行
 4      */
 5     public static void test_4(){
 6         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
 7         for(int i = 1; i < 11; i++){
 8             final int index = i;
 9             singleThreadExecutor.execute(new Runnable() {
10                 @Override
11                 public void run() {
12                     // TODO Auto-generated method stub
13                     //會按順序打印
14                     System.out.println(index);
15                 }
16             });
17         }
18     }

4、newFixedThreadPool

 1     /**
 2      * newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待
 3      */
 4     public static void test_3(){
 5         //當參數為1的時候,可以控制線程的執行順序,類似join的作用
 6         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
 7         for(int i = 0; i < 2; i++){
 8             final int index = i;
 9             fixedThreadPool.execute(new Runnable() {
10                 
11                 @Override
12                 public void run() {
13                     // TODO Auto-generated method stub
14                     try {
15                         System.out.println(index);
16                     } catch (Exception e) {
17                         // TODO Auto-generated catch block
18                         e.printStackTrace();
19                     }
20                 }
21             });
22         }
23     }

三、線程池源碼分析

以上四種線程都是由一個線程工具類Executors來創造的

技術分享

如上圖,其中newFixedThreadPool 和newCachedThreadPool 都是由threadPoolExecutor來創建的,只是參數不一致而已,
關於threadPoolExector的構造器的參數

技術分享

corePoolSize 代表該線程中允許的核心線程數,要和工作的線程數量區分開來,兩者不
等價(工作的線程數量一定不大於corePoolSize,即當超過後,會將線程
放入隊列中),可以理解為一個ArrayList集合中,默認空間是10,但存放的
元素的數量 不一定是10, 在這裏這個10就寓指corePoolSize ,存放元
素的個數是工作線程數量
maximumPoolSize 這個參數的意思就是該線程池所允許的最大線程數量
keepAliveTime 這個參數的意思就是空余線程的存活時間,註意這個值並不會對所有線程起作用,如果線程池中的線程數少於等於核心線程數 corePoolSize,那麽這些線程不會因 為空閑太長時間而被關閉,當然,也可以通過調用allowCoreThreadTimeOut方法使核心線程數內的線程也可以被回收。

unit 時間單位
workQueue 阻塞隊列,在此作用就是用來存放線程。
threadFactory 線程工廠
defaultHandler 拒絕策略,即當加入線程失敗,采用該handler來處理

3.1、線程池的拒絕策略

AbortPolicy
為java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常
DiscardPolicy
直接拋棄,任務不執行,空方法
DiscardOldestPolicy
從隊列裏面拋棄head的一個任務,並再次execute 此task。
CallerRunsPolicy
在調用execute的線程裏面執行此command,會阻塞入口

在分析該類的execute方法前,先看這幾個常量的值和一些方法的作用

 1    /*
 2     *  ctl的默認值為-536870912,
 3     *  作用是將該值傳入workerCountOf(int c)的參數c中,
 4     *  則可以返回正在工作的線程數量
 5     *  每當有一個線程加入工作,該值會加1
 6     */
 7     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 8     private static final int COUNT_BITS = Integer.SIZE - 3;   //32-3=29
 9     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//536870911
10 
11     // runState is stored in the high-order bits,其中running<shutdown<stop<tidying<terminated
12     private static final int RUNNING    = -1 << COUNT_BITS;// -536870912
13     private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
14     private static final int STOP       =  1 << COUNT_BITS;//536870912
15     private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
16     private static final int TERMINATED =  3 << COUNT_BITS;//1610612736
17 
18     // Packing and unpacking ctl
19     private static int runStateOf(int c)     { return c & ~CAPACITY; }//當c<0時該方法返回的值為-536870912,否則為0
20     private static int workerCountOf(int c)  { return c & CAPACITY; }//獲取工作線程數
21     private static int ctlOf(int rs, int wc) { return rs | wc; }//-536870912

3.2、execute

當線程為null時,直接拋出異常

技術分享

第一步、看圖,下圖所指的將corePoolSize擴充至maxmumPoolSize是一個類比,
因為在addWorker代碼中有這麽一句wc >= (core ? corePoolSize : maximumPoolSize))成立則返回false,表明core為false時會以maximumPoolSize來當做corePoolSize比較

技術分享

 1         int c = ctl.get();
 2         if (workerCountOf(c) < corePoolSize) {
 3             if (addWorker(command, true))
 4                 return;
 5             c = ctl.get();
 6         }
 7         if (isRunning(c) && workQueue.offer(command)) {
 8             int recheck = ctl.get();
 9             if (! isRunning(recheck) && remove(command))
10                 reject(command);
11             else if (workerCountOf(recheck) == 0)
12                 addWorker(null, false);
13         }
14         else if (!addWorker(command, false))
15             reject(command);

3.3、addWorker

 1         private boolean addWorker(Runnable firstTask, boolean core) {
 2         //外部循環
 3         retry:
 4         for (;;) {
 5             int c = ctl.get();//獲取當前工作線程數量,數量為{c-(-536870912)}
 6             
 7             int rs = runStateOf(c);//若c>=0時,該值才為0,否則該值一直為-536870912
 8 
 9             
10             /*
11              *由上面的一些線程池狀態常量值可知,running<shutdown<stop<tidying<terminated
12              *若rs>=shutdown,則表明線程池處於stop、tidying、terminated三種狀態的一種
13              *若rs>=shutdown成立,則進行後面判斷,
14              *1、線程池處於shutdown狀態
15              *  1.1、firstTask不為null,則返回false,也即是線程池已經處於shutdown狀態,還要添加新的線程,被直接駁回(拒絕)
16              *  1.2、firstTask為null
17              *     1.2.1、此時意味著線程池狀態為shutdown狀態,且first為null,若阻塞隊列為空,則返回false
18              *2、線程處於大於shutdown的狀態,則直接返回false
19             */
20             if (rs >= SHUTDOWN &&
21                 ! (rs == SHUTDOWN &&
22                    firstTask == null &&
23                    ! workQueue.isEmpty()))
24                 return false;
25             /*
26              *進入內循環以下兩種情況會跳出該內循環,否則一直會循環
27              *1、當工作線程數量超過一定閾值,會直接返回false
28              *2、添加工作線程成功,即ctl的值進行了加一
29             */
30             for (;;) {
31                 int wc = workerCountOf(c);//獲取工作線程的數量
32                 //當線程數量>=536870911或者>=corePoolSize或maximumPoolSize的時候,則返回false
33                 if (wc >= CAPACITY ||
34                     wc >= (core ? corePoolSize : maximumPoolSize))
35                     return false;
36                 if (compareAndIncrementWorkerCount(c))//使用unsafe的cas操作對ctl.get()的值進行加一
37                     break retry;//跳出這個外循環
38                 c = ctl.get();  // Re-read ctl
39                 if (runStateOf(c) != rs)//當此時的線程池狀態和之前的狀態不等時
40                     continue retry;//繼續內循環   
41             }
42         }
43         //若進行到了此步操作,則表明工作線程數量加了1
44         boolean workerStarted = false;
45         boolean workerAdded = false;
46         Worker w = null;
47         try {
48             w = new Worker(firstTask);
49             final Thread t = w.thread;//該w.thread為worker內部新創建的thread
50             if (t != null) {
51                 final ReentrantLock mainLock = this.mainLock;
52                 mainLock.lock();//開啟鎖
53                 try {
54                     //獲取鎖後,再次獲取線程池的狀態
55                     int rs = runStateOf(ctl.get());
56                     /*
57                      *1、當線程池的狀態處於shutdown以上狀態,則直接釋放鎖,不啟動線程,且執行addWorkerFailed方法
58                          執行該方法的作用是使工作線程數量-1
59                     */
60                     if (rs < SHUTDOWN ||
61                         (rs == SHUTDOWN && firstTask == null)) {
62                         if (t.isAlive()) // 創建的線程處於活躍狀態,即被啟動了,拋出異常
63                             throw new IllegalThreadStateException();
64                         workers.add(w);//workers是一個set集合
65                         int s = workers.size();
66                         if (s > largestPoolSize)//largestPoolSize默認為0,作用是記錄set集合中的線程數量
67                             largestPoolSize = s;
68                         workerAdded = true;//改變該值,為了啟動線程,且返回一個addWorker執行成功的狀態
69                     }
70                 } finally {
71                     mainLock.unlock();//釋放鎖
72                 }
73                 if (workerAdded) {
74                     t.start();
75                     workerStarted = true;
76                 }
77             }
78         } finally {
79             if (! workerStarted)
80                 addWorkerFailed(w);
81         }
82         return workerStarted;
83     }

總結:2017-11-12

線程池的使用及ThreadPoolExecutor的execute和addWorker源碼分析