你真的懂ThreadPoolExecutor線程池技術嗎?看了源碼你會有全新的認識
有以下場景,有個電話撥打系統,有一堆需要撥打的任務要執行,首先肯定是考慮多線程異步去執行。假如我每執行一個撥打任務都new一個Thread去執行,當同時有1萬個任務需要執行的時候,那麽就會新建1萬個線程,加上線程各種初始銷毀等操作,這個消耗是巨大的。而其實往往實現這些功能的時候,並不是完全需要實時馬上完成,只是希望在可控範圍內盡量提高執行的並發性能。
因此線程池技術應用而生,Java中最常用的線程池技術就是ThreadPoolExecutor。接下來就整體看看ThreadPoolExecutor的實現。<!-- more -->
基本使用
// 核心線程 int corePoolSize = 5; // 最大線程 int maximumPoolSize = 10; // 線程空閑回收時間 int keepAliveTime = 30; // 線程空閑回調時間單位 TimeUnit unit = TimeUnit.SECONDS; // 隊列大小 int queueSize = 20; // 隊列 BlockingQueue workQueue = new ArrayBlockingQueue<Runnable>(queueSize); ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); executor.execute(() -> { // do something 1 }); executor.execute(() -> { // do something 2 });
定義好一些必要的參數,構建一個ThreadPoolExecutor對象。然後調用對象的execute()方法即可。
參數說明:
- corePoolSize,線程池保留的最小線程數。如果線程池中的線程少於此數目,則在執行execut()時創建。
- maximumPoolSize,線程池中允許擁有的最大線程數。
- keepAliveTime、unit,當線程閑置時,保持線程存活的時間。
- workQueue,工作隊列,存放提交的等待任務,其中有隊列大小的限制。
線程管理機制
非常多人誤解了corePoolSize、maximumPoolSize、workQueue的相互關系。不少人認為無論隊列選擇什麽,corePoolSize和maximumPoolSize一定是有用,定義一定是生效的,其實並不然啊!
看下線程基本規則註解說明
- 默認情況下,線程池在初始的時候,線程數為0。當接收到一個任務時,如果線程池中存活的線程數小於corePoolSize核心線程,則新建一個線程。
- 如果所有運行的核心線程都都在忙,超出核心線程處理的任務,執行器更多地選擇把任務放進隊列,而不是新建一個線程。
- 如果一個任務提交不了到隊列,在不超出最大線程數量情況下,會新建線程。超出了就會報錯。
另外,如果想在線程初始化時候就有核心線程,可以調用prestartCoreThread()或prestartAllCoreThread(),前者是初始一個,後者是初始全部。
再看看排隊策略
- 直接提交,用SynchronousQueue。特點是不保存,直接提交給線程,如果沒沒線程,則新建一個。
- 無限提交,用類似LinkedBlockingQueue×××隊列。特點是保存所以核心線程處理不了的任務,隊列無上限,最大線程也沒用。
- 有限提交,用類似ArrayBlockingQueue有界隊列。特點是可以保存超過核心線程的任務,並且隊列也是有上限的。超過上限,新建線程(滿了拋錯)。更好地保護資源,防止崩潰,也是最常用的排隊策略。
從以上規則可以看出來,核心線程數和最大線程數,還有隊列結構是相互影響的,如何排隊,隊列多大,最大線程是多少都是不一定的。
再看看保持存活機制
當超過核心線程數的線程,線程池會讓該線程保持存活keepAliveTime時間,超過該時間則會銷毀該線程。
另外默認對非核心線程有效,若想核心線程也適用於這個機制,可以調用allowCoreThreadTimeOut()方法。這樣的話就沒有核心線程這一說了。
綜合以上,線程池在多次執行任務後,會一直維持部分線程存活,即使它是閑置的。這樣的目的是為了減少線程銷毀創建的開銷,下次有個任務需要執行,直接從池子裏拿線程就能用了。但核心線程不能維護太多,因為也需要一定開銷。最大的線程數保護了整個系統的穩定性,避免並發量大的時候,把線程擠滿。工作隊列則是保證了任務順序和暫存,系統的可靠性。線程存活規則的目的和維護核心線程的目的類似,但降低了它的存活的時間。
另外還有拒絕機制,它提供了一些異常情況下的解決方案。
ctl線程狀態控制
這個ctl變量是整個線程池的核心控制狀態。
這個ctl代表了兩個變量
- workerCount,生效的線程數。基本可以理解為存活的線程,但某個時候有暫時性的差異。
- runState,線程池的運行狀態。
其中,ctl(int32位)的低29位代表workerCount,所以最大線程數為(2^29)-1。另外3位表示runState。
runState有以下幾種狀態:
- RUNNING:接收新任務,處理隊列任務。
- SHUTDOWN:不接收新任務,但處理隊列任務。
- STOP:不接收新任務,也不處理隊列任務,並且中斷所有處理中的任務。
- TIDYING:所有任務都被終結,有效線程為0。會觸發terminated()方法。
- TERMINATED:當terminated()方法執行結束。
當調用了shutdown(),狀態會從RUNNING變成SHUTDOWN,不再接收新任務,此時會處理完隊列裏面的任務。
如果調用的是shutdownNow(),狀態會直接變成STOP。
當線程或者隊列都是空的時候,狀態就會變成TIDYING。
當terminated()執行完的時候,就會變成TERMINATED。
execute()
帶著對上面的規則與機制的認識,現在從就這這個入口開始看看源碼,到底整個流程是怎麽實現的。
- 如果少於核心線程在跑,用這個任務嘗試創建一個新線程。
- 如果一個任務成功入隊,再次檢查下線程池狀態看是否需要入隊,因為可能在入隊過程中,狀態發送了變化。如果確認入隊且沒有存活線程,則新建一個空線程。
- 如果進不了隊,則嘗試新建一個線程,如果都失敗了。拒絕這個task
對於第二點最後為什麽新建一個線程?很容易猜想到,會有一個輪詢的機制讓下個task出隊,直接利用這個空閑線程。
註釋基本解釋了所有代碼,代碼也沒什麽特別的。其中最主要的還是addWoker()這個方法,下面來看看。
addWoker()
先了解下這個方法的整體思路
從描述可知,addwoker失敗,會在線程池狀態不對、線程滿了或者線程工廠創建線程池失敗時候發生。
這個方法比較長,分兩段看。先看第一段。retry:
這種寫法,如果比較少看源碼的,應該是前所未見的了。這是個循環的位置標記,是java的語法之一。看回代碼,這裏面for循環還嵌套裏一個for循環,而retry:
是標記第一個for循環的,後面break
和continue
語句都指向到了retry
。說明break
和continue
是都是操作外層的for循環。retry可以是任何變量命名合法的字符。
然後看看外出for循環的if語句
這個if判斷想要執行到return false;
,隊列為空是一個必要條件。因為addWork()不單只接收新任務會調用到,處理隊列中的任務也會調用到。而前面提到SHUTDOWN狀態下還會處理隊列中的任務的,所以隊列不為空是會讓它繼續執行下去的。
對於內層的for循環
會先判斷worker的數據是否符合corePoolSize和maximumPoolSize的定義,不滿足則返回失敗。
然後嘗試CAS讓workerCount自增,如果CAS失敗還是繼續自旋去自增,直到成功。除非線程池狀態發生了變化,發退回到外層for循環重新執行,判斷線程池的狀態。
第一段的代碼,就是讓workerCount在符合條件下自增
第二段代碼
這段比較好理解,先創建一個Worker對象,這個Worker裏面包含一個由線程工廠創建的線程,和一個需要執行的任務(可以為空)。如果線程創建成功了,那麽就加一個重入鎖去把這個新建的Worker對象放到workers成員變量中,在加入之前需要重新判斷下線程池的狀態和新建線程的狀態。如果worker添加到workers成員變量中,就啟動這個新建的線程。最後如果添加失敗,則執行addWorkFailed(w)
。
如果失敗了,加鎖操作回滾下wokers、workerCount,然後判斷下狀態看看是否需要終結線程池。
addWorker()
大概的流程就這樣。
總結
對於其他方法,沒有什麽特別的,在此不再過多的敘述,有興趣的可以翻翻源碼閱讀下。
回顧總結下上面的核心要點
- 當核心線程滿且忙碌時,線程池傾向於把提交的任務放進隊列,而不是新建線程。
- 根據選擇隊列的不同,maximumPoolSize不一定有用的。具體有三種不同的策略。
- ctl是線程池的核心控制狀態,包含的runState線程池運行狀態和workCount有效線程數。
retry:
是一種標記循環的語法,retry可以是任何變量命名合法字符。
更多技術文章、精彩幹貨,請關註
博客:zackku.com
微信公眾號:Zack說碼
你真的懂ThreadPoolExecutor線程池技術嗎?看了源碼你會有全新的認識