線程池的使用及ThreadPoolExecutor的execute和addWorker源碼分析
說明:本作者是文章的原創作者,轉載請註明出處:本文地址:http://www.cnblogs.com/qm-article/p/7821602.html
一、線程池的介紹
在開發中,頻繁的創建和銷毀一個線程,是很耗資源的,為此找出了一個可以循環利用已經存在的線程來達到自己的目的,線程池顧名思義,也就是線程池的集合,通過線程池執行的線程任務,可以很有效的去規劃線程的使用。
在java中大致有這幾種線程池
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。,可以作一個定時器使用。
newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過需要的線程數量,可靈活回收空閑線程,若無可回收,則新建線程。
newSingleThreadExecutor
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待,當創建的線程池數量為1的時候。也類似於單線程化的線程池,當為1的時候,也可控制線程的執行順序
二、線程池的使用
1、newScheduledThreadPool
1 /** 2 * 測試newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。 3 * 一般可做定時器使用4 */ 5 public static void test_1(){ 6 //參數是線程的數量 7 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); 8 /** 9 * 第二個參數,是首次執行該線程的延遲時間,之後失效 10 * 第三個參數是,首次執行完之後,再過該段時間再次執行該線程,具有周期性 11 */ 12 scheduledExecutorService.scheduleAtFixedRate(newRunnable() { 13 14 @Override 15 public void run() { 16 System.out.println(new Date().getSeconds()); 17 18 } 19 }, 10, 3, TimeUnit.SECONDS); 20 21 }
2、newCachedThreadPool
1 /** 2 * newCachedThreadPool創建一個可緩存線程池, 3 * 如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。 4 */ 5 public static void test_2(){ 6 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); 7 for (int i = 0; i < 10; i++) { 8 final int index = i; 9 try { 10 Thread.sleep(index * 1000); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 cachedThreadPool.execute(new Runnable() { 15 16 @Override 17 public void run() { 18 // TODO Auto-generated method stub 19 System.out.println(index+":"+new Date().getSeconds()); 20 } 21 }); 22 } 23 }
3、newSingleThreadExecutor
1 /** 2 * newSingleThreadExecutor 創建一個單線程化的線程池, 3 * 它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行 4 */ 5 public static void test_4(){ 6 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); 7 for(int i = 1; i < 11; i++){ 8 final int index = i; 9 singleThreadExecutor.execute(new Runnable() { 10 @Override 11 public void run() { 12 // TODO Auto-generated method stub 13 //會按順序打印 14 System.out.println(index); 15 } 16 }); 17 } 18 }
4、newFixedThreadPool
1 /** 2 * newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待 3 */ 4 public static void test_3(){ 5 //當參數為1的時候,可以控制線程的執行順序,類似join的作用 6 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2); 7 for(int i = 0; i < 2; i++){ 8 final int index = i; 9 fixedThreadPool.execute(new Runnable() { 10 11 @Override 12 public void run() { 13 // TODO Auto-generated method stub 14 try { 15 System.out.println(index); 16 } catch (Exception e) { 17 // TODO Auto-generated catch block 18 e.printStackTrace(); 19 } 20 } 21 }); 22 } 23 }
三、線程池源碼分析
以上四種線程都是由一個線程工具類Executors來創造的
如上圖,其中newFixedThreadPool 和newCachedThreadPool 都是由threadPoolExecutor來創建的,只是參數不一致而已,
關於threadPoolExector的構造器的參數
corePoolSize 代表該線程中允許的核心線程數,要和工作的線程數量區分開來,兩者不
等價(工作的線程數量一定不大於corePoolSize,即當超過後,會將線程
放入隊列中),可以理解為一個ArrayList集合中,默認空間是10,但存放的
元素的數量 不一定是10, 在這裏這個10就寓指corePoolSize ,存放元
素的個數是工作線程數量
maximumPoolSize 這個參數的意思就是該線程池所允許的最大線程數量
keepAliveTime 這個參數的意思就是空余線程的存活時間,註意這個值並不會對所有線程起作用,如果線程池中的線程數少於等於核心線程數 corePoolSize,那麽這些線程不會因 為空閑太長時間而被關閉,當然,也可以通過調用allowCoreThreadTimeOut方法使核心線程數內的線程也可以被回收。
unit 時間單位
workQueue 阻塞隊列,在此作用就是用來存放線程。
threadFactory 線程工廠
defaultHandler 拒絕策略,即當加入線程失敗,采用該handler來處理
3.1、線程池的拒絕策略
AbortPolicy
為java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常
DiscardPolicy
直接拋棄,任務不執行,空方法
DiscardOldestPolicy
從隊列裏面拋棄head的一個任務,並再次execute 此task。
CallerRunsPolicy
在調用execute的線程裏面執行此command,會阻塞入口
在分析該類的execute方法前,先看這幾個常量的值和一些方法的作用
1 /* 2 * ctl的默認值為-536870912, 3 * 作用是將該值傳入workerCountOf(int c)的參數c中, 4 * 則可以返回正在工作的線程數量 5 * 每當有一個線程加入工作,該值會加1 6 */ 7 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 8 private static final int COUNT_BITS = Integer.SIZE - 3; //32-3=29 9 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//536870911 10 11 // runState is stored in the high-order bits,其中running<shutdown<stop<tidying<terminated 12 private static final int RUNNING = -1 << COUNT_BITS;// -536870912 13 private static final int SHUTDOWN = 0 << COUNT_BITS;//0 14 private static final int STOP = 1 << COUNT_BITS;//536870912 15 private static final int TIDYING = 2 << COUNT_BITS;//1073741824 16 private static final int TERMINATED = 3 << COUNT_BITS;//1610612736 17 18 // Packing and unpacking ctl 19 private static int runStateOf(int c) { return c & ~CAPACITY; }//當c<0時該方法返回的值為-536870912,否則為0 20 private static int workerCountOf(int c) { return c & CAPACITY; }//獲取工作線程數 21 private static int ctlOf(int rs, int wc) { return rs | wc; }//-536870912
3.2、execute
當線程為null時,直接拋出異常
第一步、看圖,下圖所指的將corePoolSize擴充至maxmumPoolSize是一個類比,
因為在addWorker代碼中有這麽一句wc >= (core ? corePoolSize : maximumPoolSize))成立則返回false,表明core為false時會以maximumPoolSize來當做corePoolSize比較
1 int c = ctl.get(); 2 if (workerCountOf(c) < corePoolSize) { 3 if (addWorker(command, true)) 4 return; 5 c = ctl.get(); 6 } 7 if (isRunning(c) && workQueue.offer(command)) { 8 int recheck = ctl.get(); 9 if (! isRunning(recheck) && remove(command)) 10 reject(command); 11 else if (workerCountOf(recheck) == 0) 12 addWorker(null, false); 13 } 14 else if (!addWorker(command, false)) 15 reject(command);
3.3、addWorker
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 //外部循環 3 retry: 4 for (;;) { 5 int c = ctl.get();//獲取當前工作線程數量,數量為{c-(-536870912)} 6 7 int rs = runStateOf(c);//若c>=0時,該值才為0,否則該值一直為-536870912 8 9 10 /* 11 *由上面的一些線程池狀態常量值可知,running<shutdown<stop<tidying<terminated 12 *若rs>=shutdown,則表明線程池處於stop、tidying、terminated三種狀態的一種 13 *若rs>=shutdown成立,則進行後面判斷, 14 *1、線程池處於shutdown狀態 15 * 1.1、firstTask不為null,則返回false,也即是線程池已經處於shutdown狀態,還要添加新的線程,被直接駁回(拒絕) 16 * 1.2、firstTask為null 17 * 1.2.1、此時意味著線程池狀態為shutdown狀態,且first為null,若阻塞隊列為空,則返回false 18 *2、線程處於大於shutdown的狀態,則直接返回false 19 */ 20 if (rs >= SHUTDOWN && 21 ! (rs == SHUTDOWN && 22 firstTask == null && 23 ! workQueue.isEmpty())) 24 return false; 25 /* 26 *進入內循環以下兩種情況會跳出該內循環,否則一直會循環 27 *1、當工作線程數量超過一定閾值,會直接返回false 28 *2、添加工作線程成功,即ctl的值進行了加一 29 */ 30 for (;;) { 31 int wc = workerCountOf(c);//獲取工作線程的數量 32 //當線程數量>=536870911或者>=corePoolSize或maximumPoolSize的時候,則返回false 33 if (wc >= CAPACITY || 34 wc >= (core ? corePoolSize : maximumPoolSize)) 35 return false; 36 if (compareAndIncrementWorkerCount(c))//使用unsafe的cas操作對ctl.get()的值進行加一 37 break retry;//跳出這個外循環 38 c = ctl.get(); // Re-read ctl 39 if (runStateOf(c) != rs)//當此時的線程池狀態和之前的狀態不等時 40 continue retry;//繼續內循環 41 } 42 } 43 //若進行到了此步操作,則表明工作線程數量加了1 44 boolean workerStarted = false; 45 boolean workerAdded = false; 46 Worker w = null; 47 try { 48 w = new Worker(firstTask); 49 final Thread t = w.thread;//該w.thread為worker內部新創建的thread 50 if (t != null) { 51 final ReentrantLock mainLock = this.mainLock; 52 mainLock.lock();//開啟鎖 53 try { 54 //獲取鎖後,再次獲取線程池的狀態 55 int rs = runStateOf(ctl.get()); 56 /* 57 *1、當線程池的狀態處於shutdown以上狀態,則直接釋放鎖,不啟動線程,且執行addWorkerFailed方法 58 執行該方法的作用是使工作線程數量-1 59 */ 60 if (rs < SHUTDOWN || 61 (rs == SHUTDOWN && firstTask == null)) { 62 if (t.isAlive()) // 創建的線程處於活躍狀態,即被啟動了,拋出異常 63 throw new IllegalThreadStateException(); 64 workers.add(w);//workers是一個set集合 65 int s = workers.size(); 66 if (s > largestPoolSize)//largestPoolSize默認為0,作用是記錄set集合中的線程數量 67 largestPoolSize = s; 68 workerAdded = true;//改變該值,為了啟動線程,且返回一個addWorker執行成功的狀態 69 } 70 } finally { 71 mainLock.unlock();//釋放鎖 72 } 73 if (workerAdded) { 74 t.start(); 75 workerStarted = true; 76 } 77 } 78 } finally { 79 if (! workerStarted) 80 addWorkerFailed(w); 81 } 82 return workerStarted; 83 }
總結:2017-11-12
線程池的使用及ThreadPoolExecutor的execute和addWorker源碼分析