JAVA執行緒池精華總結
一. 為什麼用執行緒池(執行緒池的作用)
1. 建立/銷燬執行緒伴隨著系統開銷,過於頻繁的建立/銷燬執行緒,會很大程度上影響處理效率
例如:
記建立執行緒消耗時間T1,執行任務消耗時間T2,銷燬執行緒消耗時間T3
如果T1+T3>T2,那麼是不是說開啟一個執行緒來執行這個任務太不划算了!
正好,執行緒池快取執行緒,可用已有的閒置執行緒來執行新任務,避免了T1+T3帶來的系統開銷
2. 執行緒併發數量過多,搶佔系統資源從而導致阻塞
我們知道執行緒能共享系統資源,如果同時執行的執行緒過多,就有可能導致系統資源不足而產生阻塞的情況
運用執行緒池能有效的控制執行緒最大併發數,避免以上的問題
3. 對執行緒進行一些簡單的管理
比如:延時執行、定時迴圈執行的策略等
運用執行緒池都能進行很好的實現
二. 執行緒池ThreadPoolExecutor
Java裡面的執行緒池介面是ExecutorService。具體實現為ThreadPoolExecutor類。
比較重要的幾個類:
Executor | 執行器介面(執行一個給定的Runnable實現) |
ExecutorService |
執行緒池介面(繼承Executor介面) |
ThreadPoolExecutor |
ExecutorService的預設實現 |
ScheduledExecutorService |
排程執行緒池介面 |
ScheduledThreadPoolExecutor |
ScheduledExecutorService的預設實現,和Timer/TimerTask類似,解決那些需要任務進行延遲或重複執行的問題 |
對執行緒池的配置,就是對ThreadPoolExecutor建構函式的引數的配置,既然這些引數這麼重要,就來看看建構函式的各個引數吧
ThreadPoolExecutor提供了四個建構函式:
//五個引數的建構函式 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) //六個引數的建構函式-1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) //六個引數的建構函式-2 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) //七個引數的建構函式 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
int corePoolSize => 該執行緒池中核心執行緒數最大值
核心執行緒:
執行緒池新建執行緒的時候,如果當前執行緒總數小於corePoolSize,則新建的是核心執行緒,如果超過corePoolSize,則新建的是非核心執行緒
核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹(閒置狀態)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉
很好理解吧,正常情況下你不幹活我也養你,因為我總有用到你的時候,但有時候特殊情況(比如我自己都養不起了),那你不幹活我就要把你幹掉了
int maximumPoolSize
該執行緒池中執行緒總數最大值
執行緒總數 = 核心執行緒數 + 非核心執行緒數。
long keepAliveTime
該執行緒池中非核心執行緒閒置超時時長
一個非核心執行緒,如果不幹活(閒置狀態)的時長超過這個引數所設定的時長,就會被銷燬掉
如果設定allowCoreThreadTimeOut = true,則會作用於核心執行緒
TimeUnit unit
keepAliveTime的單位,TimeUnit是一個列舉型別,其包括:
- NANOSECONDS : 1微毫秒 = 1微秒 / 1000
- MICROSECONDS : 1微秒 = 1毫秒 / 1000
- MILLISECONDS : 1毫秒 = 1秒 /1000
- SECONDS : 秒
- MINUTES : 分
- HOURS : 小時
- DAYS : 天
BlockingQueue<Runnable> workQueue
執行前用於保持任務的佇列。此佇列僅保持由 execute方法提交的 Runnable 任務。
一. 排隊的三種通用策略
直接提交:工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。
無界佇列:使用無界佇列(例如,不具有初始容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web頁伺服器中。這種排隊可用於處理瞬態突發請求。
有界佇列:當使用有限的 maximumPoolSizes時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。使用小型佇列和大型池,CPU使用率較高,可能遇到不可接受的排程開銷,這樣也會降低吞吐量。
二. 常用的幾種Queue
SynchronousQueue:這個佇列接收到任務的時候,會直接提交給執行緒處理,而不保留它,如果所有執行緒都在工作怎麼辦?那就新建一個執行緒來處理這個任務!所以為了保證不出現<執行緒數達到了maximumPoolSize而不能新建執行緒>的錯誤,使用這個型別佇列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大。
LinkedBlockingQueue:這個佇列接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了maximumPoolSize的設定失效,因為匯流排程數永遠不會超過corePoolSize。
ArrayBlockingQueue:可以限定佇列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒(非核心執行緒)執行任務,又如果匯流排程數到了maximumPoolSize,則採用拒絕策略處理。
DelayQueue:佇列內元素必須實現Delayed介面,這就意味著你傳進去的任務必須先實現Delayed介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務。
ThreadFactory threadFactory
執行程式建立新執行緒時使用的工廠類,實現類需要實現Thread newThread(Runnable r)方法
RejectedExecutionHandler
拒絕策略介面。在ThreadPoolExecutor中已經預設包含了4中策略,因為原始碼非常簡單,這裡直接貼出來。
CallerRunsPolicy:呼叫 execute 方法的執行緒自身執行。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
這個策略顯然不想放棄執行任務。但是由於池中已經沒有任何資源了,那麼就直接使用呼叫該execute的執行緒本身來執行。
AbortPolicy:為JAVA執行緒池預設的阻塞策略,不執行此任務,直接丟擲一個執行時異常rejectedExecution,切記
ThreadPoolExecutor.execute需要try...catch,否則程式會直接退出。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
這種策略直接丟擲異常,丟棄任務。
DiscardPolicy:這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不丟擲異常。空方法!
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
DiscardOldestPolicy:如果執行緒池還未關閉,丟棄掉佇列頭部的(最老的)任務,然後重試執行程式(如果再次失敗,則重複此過程)。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
}
}
該策略就稍微複雜一些,在pool沒有關閉的前提下首先丟掉快取在佇列中的最早的任務,然後重新嘗試執行該任務。這個策略需要適當小心。
設想:如果其他執行緒都還在執行,那麼新來任務踢掉舊任務,快取在queue中,再來一個任務又會踢掉queue中最老任務。
使用者自定義拒絕策略:實現RejectedExecutionHandler,並自己定義策略模式
新建一個執行緒池的時候,一般只用5個引數的建構函式。
三. 向ThreadPoolExecutor新增任務
有幾種不同的方式來將任務委託給 ExecutorService 去執行:
- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny(...)
- invokeAll(...)
四. ThreadPoolExecutor的執行策略
上面介紹引數的時候其實已經說到了ThreadPoolExecutor執行的策略,這裡給總結一下,當一個任務被新增進執行緒池時:
- 執行緒數量未達到corePoolSize,則新建一個執行緒(核心執行緒)執行任務。
- 執行緒數量達到了corePools,則將任務移入佇列等待。
- 佇列已滿,新建執行緒(非核心執行緒)執行任務。
- 佇列已滿,匯流排程數又達到了maximumPoolSize,執行拒絕策略。
五. 常見四種執行緒池
如果你不想自己寫一個執行緒池,那麼你可以從下面看看有沒有符合你要求的(一般都夠用了),如果有,那麼很好你直接用就行了,如果沒有,那你就老老實實自己去寫一個吧
Java通過Executors提供了四種執行緒池,這四種執行緒池都是直接或間接配置ThreadPoolExecutor的引數實現的,下面我都會貼出這四種執行緒池建構函式的原始碼,各位大佬們一看便知!
來,走起:
1. CachedThreadPool()
可快取執行緒池,如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。
- 執行緒池大小無限制
- 有空閒執行緒則複用空閒執行緒,若無空閒執行緒則新建執行緒
- 一定程式減少頻繁建立/銷燬執行緒,減少系統開銷
建立方法:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
原始碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2. FixedThreadPool()
定長執行緒池:
- 可控制執行緒最大併發數(同時執行的執行緒數)。
- 超出的執行緒會在佇列中等待。
- 如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。
建立方法:
//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
原始碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2個引數的構造方法原始碼,不用我貼你也知道他把星期六放在了哪個位置!所以我就不貼了,省下篇幅給我扯皮
3. ScheduledThreadPool()
定長執行緒池:
- 支援定時及週期性任務執行。
- 執行緒池大小無限制
建立方法:
//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
原始碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
4. SingleThreadExecutor()
單執行緒執行緒池
- 有且僅有一個工作執行緒執行任務。
- 所有任務按照任務的提交順序執行,即遵循佇列的入隊出隊規則。
- 如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。
建立方法:
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
原始碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
參考: