Java多執行緒與執行緒池技術
一、序言
Java多執行緒程式設計執行緒池被廣泛使用,甚至成為了標配。
執行緒池本質是池化技術
的應用,和連線池類似,建立連線與關閉連線屬於耗時操作,建立執行緒與銷燬執行緒也屬於重操作,為了提高效率,先提前建立好一批執行緒,當有需要使用執行緒時從執行緒池取出,用完後放回執行緒池,這樣避免了頻繁建立與銷燬執行緒。
// 任務
Runnable runnable = () -> System.out.println(Thread.currentThread().getId());
在應用中優先選用執行緒池執行非同步任務,根據不同的場景選用不同的執行緒池,提高非同步任務執行效率。
1、普通執行
new Thread(runnable).start();
2、執行緒池執行
Executors.newSingleThreadExecutor().execute(runnable)
二、執行緒池基礎
(一)核心引數
1、核心引數
執行緒池的核心引數決定了池的型別,進而決定了池的特性。
引數 | 解釋 | 行為 |
---|---|---|
corePoolSize | 核心執行緒數 | 池中長期維護的執行緒數量,不主動回收 |
maximumPoolSize | 最大執行緒數 | 最大執行緒數大於等於核心執行緒數 |
keepAliveTime | 執行緒最大空閒時間 | 非核心執行緒最大空閒時間,超時回收執行緒 |
workQueue | 工作佇列 | 工作佇列直接決定執行緒池的型別 |
2、引數與池的關係
Executors類預設建立執行緒池與引數對應關係。
執行緒池 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60 | SynchronousQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newFixedThreadPool | N | N | 0 | LinkedBlockingQueue |
newScheduledThreadPool | N | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
(二)執行緒池對比
根據使用場景選擇對應的執行緒池。
1、通用對比
執行緒池 | 特點 | 適用場景 |
---|---|---|
newCachedThreadPool | 超時未使用的執行緒回自動銷燬,有新任務時自動建立 | 適用於低頻、輕量級的任務。回收執行緒的目的是節約執行緒長時間空閒而佔有的資源。 |
newSingleThreadExecutor | 執行緒池中有且只有一個執行緒 | 順序執行任務 |
newFixedThreadPool | 執行緒池中有固定數量的執行緒,且一直存在 | 適用於高頻的任務,即執行緒在大多數時間裡都處於工作狀態。 |
newScheduledThreadPool | 定時執行緒池 | 與定時排程相關聯 |
2、拓展對比
維護僅有一個執行緒的執行緒池有如下兩種方式,正常使用的情況下,二者差異不大;複雜使用環境下,二者存在細微的差異。用newSingleThreadExecutor方式建立的執行緒池在任何時刻至多隻有一個執行緒,因此可以理解為用非同步的方式執行順序任務;後者初始化的時候也只有一個執行緒,使用過程中可能會出現最大執行緒數超過1的情況,這時要求線性執行的任務會並行執行,業務邏輯可能會出現問題,與實際場景有關。
private final static ExecutorService executor = Executors.newSingleThreadExecutor();
private final static ExecutorService executor = Executors.newFixedThreadPool(1);
(三)執行緒池原理
執行緒池主要處理流程,任務提交之後是怎麼執行的。大致如下:
- 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務
- 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中
- 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務
- 如果執行緒池也滿了,則按照拒絕策略對任務進行處理
(四)提交任務的方式
往執行緒池中提交任務,主要有兩種方法:提交無返回值的任務和提交有返回值的任務。
1、無返回值任務
execute
用於提交不需要返回結果的任務。
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> System.out.println("hello"));
}
2、有返回值任務
submit()
用於提交一個需要返回果的任務。
該方法返回一個Future
物件,通過呼叫這個物件的get()
方法,我們就能獲得返回結果。get()
方法會一直阻塞,直到返回結果返回。
我們也可以使用它的過載方法get(long timeout, TimeUnit unit)
,這個方法也會阻塞,但是在超時時間內仍然沒有返回結果時,將丟擲異常TimeoutException
。
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Long> future = executor.submit(() -> {
System.out.println("task is executed");
return System.currentTimeMillis();
});
System.out.println("task execute time is: " + future.get());
}
在提交任務時,如果無返回值任務,優先使用
execute
。
(無)關閉執行緒池
線上程池使用完成之後,我們需要對執行緒池中的資源進行釋放操作,這就涉及到關閉功能。我們可以呼叫執行緒池物件的shutdown()
和shutdownNow()
方法來關閉執行緒池。
這兩個方法都是關閉操作,又有什麼不同呢?
-
shutdown()
會將執行緒池狀態置為SHUTDOWN
,不再接受新的任務,同時會等待執行緒池中已有的任務執行完成再結束。 -
shutdownNow()
會將執行緒池狀態置為SHUTDOWN
,對所有執行緒執行interrupt()
操作,清空佇列,並將佇列中的任務返回回來。
另外,關閉執行緒池涉及到兩個返回boolean的方法,isShutdown()
和isTerminated
,分別表示是否關閉和是否終止。
三、Executors
Executors
是一個執行緒池工廠,提供了很多的工廠方法,我們來看看它大概能建立哪些執行緒池。
// 建立單一執行緒的執行緒池
public static ExecutorService newSingleThreadExecutor();
// 建立固定數量的執行緒池
public static ExecutorService newFixedThreadPool(int nThreads);
// 建立帶快取的執行緒池
public static ExecutorService newCachedThreadPool();
// 建立定時排程的執行緒池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 建立流式(fork-join)執行緒池
public static ExecutorService newWorkStealingPool();
1、建立單一執行緒的執行緒池
任何時候執行緒池中至多隻有一個執行緒,當執行緒執行異常終止時會自動建立一個新執行緒替換。如果既有非同步執行任務的需求又希望任務得以順序執行,那麼此型別執行緒池是首選。
若多個任務被提交到此執行緒池,那麼會被快取到佇列。當執行緒空閒的時候,按照FIFO的方式進行處理。
2、建立固定數量的執行緒池
建立核心執行緒與最大執行緒數相等的固定執行緒數的執行緒池,任何時刻至多有固定數目的執行緒,當執行緒因異常而終止時則會自動建立執行緒替換。
當有新任務加入時,如果池內執行緒均處於活躍狀態,則任務進入等待佇列中,直到有空閒執行緒,佇列中的任務才會被順序執行;如果池內有非活躍執行緒,則任務可以立刻得以執行。
- 如果執行緒的數量未達到指定數量,則建立執行緒來執行任務
- 如果執行緒池的數量達到了指定數量,並且有執行緒是空閒的,則取出空閒執行緒執行任務
- 如果沒有執行緒是空閒的,則將任務快取到佇列(佇列長度為
Integer.MAX_VALUE
)。當執行緒空閒的時候,按照FIFO的方式進行處理
3、建立可伸縮的執行緒池
這種方式建立的執行緒池,核心執行緒池的長度為0,執行緒池最大長度為Integer.MAX_VALUE
。由於本身使用SynchronousQueue
作為等待佇列的緣故,導致往佇列裡面每插入一個元素,必須等待另一個執行緒從這個佇列刪除一個元素。
- 執行緒池可維護0到Integer.MAX_VALUE個執行緒資源,空閒執行緒預設情況下超過60秒未使用則會被銷燬,長期閒置的池佔用較少的資源。
- 當有新任務加入時,如果池中有空閒且尚未銷燬的執行緒,則將任務交給此執行緒執行;如果沒有可用的執行緒,則建立一個新執行緒執行任務並新增到池中。
4、建立定時排程的執行緒池
和上面3個工廠方法返回的執行緒池型別有所不同,它返回的是ScheduledThreadPoolExecutor
型別的執行緒池。平時我們實現定時排程功能的時候,可能更多的是使用第三方類庫,比如:quartz等。但是對於更底層的功能,我們仍然需要了解。
四、手動建立執行緒池
理論上,我們可以通過Executors
來建立執行緒池,這種方式非常簡單。但正是因為簡單,所以限制了執行緒池的功能。比如:無長度限制的佇列,可能因為任務堆積導致OOM,這是非常嚴重的bug,應儘可能地避免。怎麼避免?歸根結底,還是需要我們通過更底層的方式來建立執行緒池。
拋開定時排程的執行緒池不管,我們看看ThreadPoolExecutor
。它提供了好幾個構造方法,但是最底層的構造方法卻只有一個。那麼,我們就從這個構造方法著手分析。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
這個構造方法有7個引數,我們逐一來進行分析。
-
corePoolSize
,執行緒池中的核心執行緒數 -
maximumPoolSize
,執行緒池中的最大執行緒數 -
keepAliveTime
,空閒時間,當執行緒池數量超過核心執行緒數時,多餘的空閒執行緒存活的時間,即:這些執行緒多久被銷燬。 -
unit
,空閒時間的單位,可以是毫秒、秒、分鐘、小時和天,等等 -
workQueue
,等待佇列,執行緒池中的執行緒數超過核心執行緒數時,任務將放在等待佇列,它是一個BlockingQueue
型別的物件 -
threadFactory
,執行緒工廠,我們可以使用它來建立一個執行緒 -
handler
,拒絕策略,當執行緒池和等待佇列都滿了之後,需要通過該物件的回撥函式進行回撥處理
這些引數裡面,基本型別的引數都比較簡單,我們不做進一步的分析。我們更關心的是workQueue
、threadFactory
和handler
,接下來我們將進一步分析。
(一)等待佇列-workQueue
等待佇列是BlockingQueue
型別的,理論上只要是它的子類,我們都可以用來作為等待佇列。
同時,jdk內部自帶一些阻塞佇列,我們來看看大概有哪些。
-
ArrayBlockingQueue
,佇列是有界的,基於陣列實現的阻塞佇列 -
LinkedBlockingQueue
,佇列可以有界,也可以無界。基於連結串列實現的阻塞佇列 -
SynchronousQueue
,不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作將一直處於阻塞狀態。該佇列也是Executors.newCachedThreadPool()
的預設佇列 -
PriorityBlockingQueue
,帶優先順序的無界阻塞佇列
通常情況下,我們需要指定阻塞佇列的上界(比如1024)。另外,如果執行的任務很多,我們可能需要將任務進行分類,然後將不同分類的任務放到不同的執行緒池中執行。
(二)執行緒工廠-threadFactory
ThreadFactory
是一個介面,只有一個方法。既然是執行緒工廠,那麼我們就可以用它生產一個執行緒物件。來看看這個介面的定義。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
Executors
的實現使用了預設的執行緒工廠-DefaultThreadFactory
。它的實現主要用於建立一個執行緒,執行緒的名字為pool-{poolNum}-thread-{threadNum}
。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
很多時候,我們需要自定義執行緒名字。我們只需要自己實現ThreadFactory
,用於建立特定場景的執行緒即可。
(三)拒絕策略-handler
所謂拒絕策略,就是當執行緒池滿了、佇列也滿了的時候,我們對任務採取的措施。或者丟棄、或者執行、或者其他...
jdk自帶4種拒絕策略,我們來看看。
-
CallerRunsPolicy
// 在呼叫者執行緒執行 -
AbortPolicy
// 直接丟擲RejectedExecutionException
異常 -
DiscardPolicy
// 任務直接丟棄,不做任何處理 -
DiscardOldestPolicy
// 丟棄佇列裡最舊的那個任務,再嘗試執行當前任務
這四種策略各有優劣,比較常用的是DiscardPolicy
,但是這種策略有一個弊端就是任務執行的軌跡不會被記錄下來。所以,我們往往需要實現自定義的拒絕策略, 通過實現RejectedExecutionHandler
介面的方式。
五、其它
配置執行緒池的引數
前面我們講到了手動建立執行緒池涉及到的幾個引數,那麼我們要如何設定這些引數才算是正確的應用呢?實際上,需要根據任務的特性來分析。
- 任務的性質:CPU密集型、IO密集型和混雜型
- 任務的優先順序:高中低
- 任務執行的時間:長中短
- 任務的依賴性:是否依賴資料庫或者其他系統資源
不同的性質的任務,我們採取的配置將有所不同。在《Java併發程式設計實踐》中有相應的計算公式。
通常來說,如果任務屬於CPU密集型,那麼我們可以將執行緒池數量設定成CPU的個數,以減少執行緒切換帶來的開銷。如果任務屬於IO密集型,我們可以將執行緒池數量設定得更多一些,比如CPU個數*2。
PS:我們可以通過
Runtime.getRuntime().availableProcessors()
來獲取CPU的個數。
執行緒池監控
如果系統中大量用到了執行緒池,那麼我們有必要對執行緒池進行監控。利用監控,我們能在問題出現前提前感知到,也可以根據監控資訊來定位可能出現的問題。
那麼我們可以監控哪些資訊?又有哪些方法可用於我們的擴充套件支援呢?
首先,ThreadPoolExecutor
自帶了一些方法。
-
long getTaskCount()
,獲取已經執行或正在執行的任務數 -
long getCompletedTaskCount()
,獲取已經執行的任務數 -
int getLargestPoolSize()
,獲取執行緒池曾經建立過的最大執行緒數,根據這個引數,我們可以知道執行緒池是否滿過 -
int getPoolSize()
,獲取執行緒池執行緒數 -
int getActiveCount()
,獲取活躍執行緒數(正在執行任務的執行緒數)
其次,ThreadPoolExecutor
留給我們自行處理的方法有3個,它在ThreadPoolExecutor
中為空實現(也就是什麼都不做)。
-
protected void beforeExecute(Thread t, Runnable r)
// 任務執行前被呼叫 -
protected void afterExecute(Runnable r, Throwable t)
// 任務執行後被呼叫 -
protected void terminated()
// 執行緒池結束後被呼叫
六、總結
- 儘量使用手動的方式建立執行緒池,避免使用
Executors
工廠類 - 根據場景,合理設定執行緒池的各個引數,包括執行緒池數量、佇列、執行緒工廠和拒絕策略