執行緒池基本結構原理
阿新 • • 發佈:2018-12-15
-
ThreadPoolExecutor執行緒池頂級父類Executor
interface Executor extends interface ExecutorService implements abstract AbstractExecutorService extends ThreadPoolExecutor
-
ThreadPoolExecutor引數
corePoolSize: 核心執行緒數量 maximumPoolSize: 執行緒池維護執行緒的最大數量 keepAliveTime: 執行緒池維護執行緒所允許的空閒時間 TimeUnit: 時間單位 workQueue: 執行緒池所使用的緩衝佇列 handler: 執行緒池對拒絕任務的處理策略
-
執行緒池執行基本原理
新來執行緒 1 執行緒池執行緒數量沒有達到核心執行緒數量,則新建執行緒 2 執行緒池執行緒數量達到核心執行緒數量,有空閒執行緒,則直接執行 3 執行緒池執行緒數量達到核心執行緒數量,沒有空閒執行緒,則放入佇列 4 執行緒池執行緒數量達到核心執行緒數量,沒有空閒執行緒,佇列已滿,則執行緒數量增加至Max執行緒數量 5 執行緒池執行緒數量達到核心執行緒數量,沒有空閒執行緒,佇列已滿,執行緒數量已增加至Max執行緒數量,則呼叫丟棄策略 1 丟棄任務有4種方式 實現RejectedExecutionHandler介面(是ThreadPoolExecutor內部類) 提供了四種方式來處理任務拒絕策略 1、直接丟棄(DiscardPolicy) 2、丟棄佇列中最老的任務(DiscardOldestPolicy)。 3、拋異常(AbortPolicy) 4、將任務分給呼叫執行緒來執行(CallerRunsPolicy)。
-
ThreadPoolExecutor#execute方法
1 基本判斷步驟 //ThreadPoolExecutor#execute public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //由它可以獲取到當前有效的執行緒數和執行緒池的狀態 /*1.獲取當前正在執行執行緒數是否小於核心執行緒池,是則新建立一個執行緒執行任務,否則將任務放到任務佇列中*/ if (workerCountOf(c) < corePoolSize){ if (addWorker(command, tre)) //在addWorker中建立工作執行緒執行任務 return ; c = ctl.get(); } /*2.當前核心執行緒池中全部執行緒都在執行workerCountOf(c) >= corePoolSize,所以此時將執行緒放到任務佇列中*/ if (isRunning(c) && workQueue.offer(command)) { //執行緒池是否處於執行狀態,且是否任務插入任務佇列成功 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) //執行緒池是否處於執行狀態,如果不是則使剛剛的任務出隊 reject(command); //丟擲RejectedExceptionException異常 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /*3.插入佇列不成功,且當前執行緒數數量小於最大執行緒池數量,此時則建立新執行緒執行任務,建立失敗丟擲異常*/ else if (!addWorker(command, false)){ reject(command); //丟擲RejectedExceptionException異常 } } 2 四個步驟 1 執行緒池執行緒數量沒有達到核心執行緒數量,則新建執行緒 2 執行緒池執行緒數量達到核心執行緒數量,沒有空閒執行緒,則放入佇列 3 執行緒池執行緒數量達到核心執行緒數量,沒有空閒執行緒,佇列已滿,則執行緒數量增加至Max執行緒數量 4 執行緒池執行緒數量達到核心執行緒數量,沒有空閒執行緒,佇列已滿,執行緒數量已增加至Max執行緒數量,則呼叫丟棄策略 建立失敗丟擲異常
-
BlockingQueue佇列
1 常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種) 先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。 後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。 2 BlockingQueue的核心方法: 1 多執行緒環境下為什麼需要BlockingQueue的原因? 作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒, 因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法: 2 放入資料 (1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法 的執行緒); (2)offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。 (3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續. 3. 獲取資料 (1)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null; (2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間 超時還沒有資料可取,返回失敗。 (3)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入; (4)drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。 4 1. ArrayBlockingQueue 基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外, ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。 ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue; 按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧, 以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於, 前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中, 其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。 2.LinkedBlockingQueue 基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時, 佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列, 直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料, 還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。 作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE), 這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。 ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。 3. DelayQueue DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞, 而只有獲取資料的操作(消費者)才會被阻塞。 使用場景: DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。 4. PriorityBlockingQueue 基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時, 阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時 ,內部控制執行緒同步的鎖採用的是公平鎖。 5. SynchronousQueue 一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者, 如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商, 而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面, 又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。 宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別: 如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略; 但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距, 則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。