1. 程式人生 > 其它 >Java多執行緒與執行緒池技術

Java多執行緒與執行緒池技術

一、序言

Java多執行緒程式設計執行緒池被廣泛使用,甚至成為了標配。

執行緒池本質是池化技術的應用,和連線池類似,建立連線與關閉連線屬於耗時操作,建立執行緒與銷燬執行緒也屬於重操作,為了提高效率,先提前建立好一批執行緒,當有需要使用執行緒時從執行緒池取出,用完後放回執行緒池,這樣避免了頻繁建立與銷燬執行緒。

// 任務
Runnable runnable = () -> System.out.println(Thread.currentThread().getId());

在應用中優先選用執行緒池執行非同步任務,根據不同的場景選用不同的執行緒池,提高非同步任務執行效率。

1、普通執行
new Thread(runnable).start();
2、執行緒池執行
Executors.newSingleThreadExecutor().execute(runnable)

二、執行緒池基礎

(一)核心引數

1、核心引數

執行緒池的核心引數決定了池的型別,進而決定了池的特性。

引數 解釋 行為
corePoolSize 核心執行緒數 池中長期維護的執行緒數量,不主動回收
maximumPoolSize 最大執行緒數 最大執行緒數大於等於核心執行緒數
keepAliveTime 執行緒最大空閒時間 非核心執行緒最大空閒時間,超時回收執行緒
workQueue 工作佇列 工作佇列直接決定執行緒池的型別
2、引數與池的關係

Executors類預設建立執行緒池與引數對應關係。

執行緒池 corePoolSize maximumPoolSize keepAliveTime workQueue
newCachedThreadPool 0 Integer.MAX_VALUE 60 SynchronousQueue
newSingleThreadExecutor 1 1 0 LinkedBlockingQueue
newFixedThreadPool N N 0 LinkedBlockingQueue
newScheduledThreadPool N Integer.MAX_VALUE 0 DelayedWorkQueue

(二)執行緒池對比

根據使用場景選擇對應的執行緒池。

1、通用對比
執行緒池 特點 適用場景
newCachedThreadPool 超時未使用的執行緒回自動銷燬,有新任務時自動建立 適用於低頻、輕量級的任務。回收執行緒的目的是節約執行緒長時間空閒而佔有的資源。
newSingleThreadExecutor 執行緒池中有且只有一個執行緒 順序執行任務
newFixedThreadPool 執行緒池中有固定數量的執行緒,且一直存在 適用於高頻的任務,即執行緒在大多數時間裡都處於工作狀態。
newScheduledThreadPool 定時執行緒池 與定時排程相關聯
2、拓展對比

維護僅有一個執行緒的執行緒池有如下兩種方式,正常使用的情況下,二者差異不大;複雜使用環境下,二者存在細微的差異。用newSingleThreadExecutor方式建立的執行緒池在任何時刻至多隻有一個執行緒,因此可以理解為用非同步的方式執行順序任務;後者初始化的時候也只有一個執行緒,使用過程中可能會出現最大執行緒數超過1的情況,這時要求線性執行的任務會並行執行,業務邏輯可能會出現問題,與實際場景有關。

private final static ExecutorService executor = Executors.newSingleThreadExecutor();
private final static ExecutorService executor = Executors.newFixedThreadPool(1);

(三)執行緒池原理

執行緒池主要處理流程,任務提交之後是怎麼執行的。大致如下:

  1. 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務
  2. 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中
  3. 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務
  4. 如果執行緒池也滿了,則按照拒絕策略對任務進行處理

(四)提交任務的方式

往執行緒池中提交任務,主要有兩種方法:提交無返回值的任務和提交有返回值的任務。

1、無返回值任務

execute用於提交不需要返回結果的任務。

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.execute(() -> System.out.println("hello"));
}
2、有返回值任務

submit()用於提交一個需要返回果的任務。

該方法返回一個Future物件,通過呼叫這個物件的get()方法,我們就能獲得返回結果。get()方法會一直阻塞,直到返回結果返回。

我們也可以使用它的過載方法get(long timeout, TimeUnit unit),這個方法也會阻塞,但是在超時時間內仍然沒有返回結果時,將丟擲異常TimeoutException

public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    Future<Long> future = executor.submit(() -> {
        System.out.println("task is executed");
        return System.currentTimeMillis();
    });
    System.out.println("task execute time is: " + future.get());
}

在提交任務時,如果無返回值任務,優先使用execute

(無)關閉執行緒池

線上程池使用完成之後,我們需要對執行緒池中的資源進行釋放操作,這就涉及到關閉功能。我們可以呼叫執行緒池物件的shutdown()shutdownNow()方法來關閉執行緒池。

這兩個方法都是關閉操作,又有什麼不同呢?

  1. shutdown()會將執行緒池狀態置為SHUTDOWN,不再接受新的任務,同時會等待執行緒池中已有的任務執行完成再結束。
  2. shutdownNow()會將執行緒池狀態置為SHUTDOWN,對所有執行緒執行interrupt()操作,清空佇列,並將佇列中的任務返回回來。

另外,關閉執行緒池涉及到兩個返回boolean的方法,isShutdown()isTerminated,分別表示是否關閉和是否終止。

三、Executors

Executors是一個執行緒池工廠,提供了很多的工廠方法,我們來看看它大概能建立哪些執行緒池。

// 建立單一執行緒的執行緒池
public static ExecutorService newSingleThreadExecutor();
// 建立固定數量的執行緒池
public static ExecutorService newFixedThreadPool(int nThreads);
// 建立帶快取的執行緒池
public static ExecutorService newCachedThreadPool();
// 建立定時排程的執行緒池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 建立流式(fork-join)執行緒池
public static ExecutorService newWorkStealingPool();
1、建立單一執行緒的執行緒池

任何時候執行緒池中至多隻有一個執行緒,當執行緒執行異常終止時會自動建立一個新執行緒替換。如果既有非同步執行任務的需求又希望任務得以順序執行,那麼此型別執行緒池是首選。

若多個任務被提交到此執行緒池,那麼會被快取到佇列。當執行緒空閒的時候,按照FIFO的方式進行處理。

2、建立固定數量的執行緒池

建立核心執行緒與最大執行緒數相等的固定執行緒數的執行緒池,任何時刻至多有固定數目的執行緒,當執行緒因異常而終止時則會自動建立執行緒替換。

當有新任務加入時,如果池內執行緒均處於活躍狀態,則任務進入等待佇列中,直到有空閒執行緒,佇列中的任務才會被順序執行;如果池內有非活躍執行緒,則任務可以立刻得以執行。

  • 如果執行緒的數量未達到指定數量,則建立執行緒來執行任務
  • 如果執行緒池的數量達到了指定數量,並且有執行緒是空閒的,則取出空閒執行緒執行任務
  • 如果沒有執行緒是空閒的,則將任務快取到佇列(佇列長度為Integer.MAX_VALUE)。當執行緒空閒的時候,按照FIFO的方式進行處理
3、建立可伸縮的執行緒池

這種方式建立的執行緒池,核心執行緒池的長度為0,執行緒池最大長度為Integer.MAX_VALUE。由於本身使用SynchronousQueue作為等待佇列的緣故,導致往佇列裡面每插入一個元素,必須等待另一個執行緒從這個佇列刪除一個元素。

  • 執行緒池可維護0到Integer.MAX_VALUE個執行緒資源,空閒執行緒預設情況下超過60秒未使用則會被銷燬,長期閒置的池佔用較少的資源。
  • 當有新任務加入時,如果池中有空閒且尚未銷燬的執行緒,則將任務交給此執行緒執行;如果沒有可用的執行緒,則建立一個新執行緒執行任務並新增到池中。
4、建立定時排程的執行緒池

和上面3個工廠方法返回的執行緒池型別有所不同,它返回的是ScheduledThreadPoolExecutor型別的執行緒池。平時我們實現定時排程功能的時候,可能更多的是使用第三方類庫,比如:quartz等。但是對於更底層的功能,我們仍然需要了解。

四、手動建立執行緒池

理論上,我們可以通過Executors來建立執行緒池,這種方式非常簡單。但正是因為簡單,所以限制了執行緒池的功能。比如:無長度限制的佇列,可能因為任務堆積導致OOM,這是非常嚴重的bug,應儘可能地避免。怎麼避免?歸根結底,還是需要我們通過更底層的方式來建立執行緒池。

拋開定時排程的執行緒池不管,我們看看ThreadPoolExecutor。它提供了好幾個構造方法,但是最底層的構造方法卻只有一個。那麼,我們就從這個構造方法著手分析。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler);

這個構造方法有7個引數,我們逐一來進行分析。

  1. corePoolSize,執行緒池中的核心執行緒數
  2. maximumPoolSize,執行緒池中的最大執行緒數
  3. keepAliveTime,空閒時間,當執行緒池數量超過核心執行緒數時,多餘的空閒執行緒存活的時間,即:這些執行緒多久被銷燬。
  4. unit,空閒時間的單位,可以是毫秒、秒、分鐘、小時和天,等等
  5. workQueue,等待佇列,執行緒池中的執行緒數超過核心執行緒數時,任務將放在等待佇列,它是一個BlockingQueue型別的物件
  6. threadFactory,執行緒工廠,我們可以使用它來建立一個執行緒
  7. handler,拒絕策略,當執行緒池和等待佇列都滿了之後,需要通過該物件的回撥函式進行回撥處理

這些引數裡面,基本型別的引數都比較簡單,我們不做進一步的分析。我們更關心的是workQueuethreadFactoryhandler,接下來我們將進一步分析。

(一)等待佇列-workQueue

等待佇列是BlockingQueue型別的,理論上只要是它的子類,我們都可以用來作為等待佇列。

同時,jdk內部自帶一些阻塞佇列,我們來看看大概有哪些。

  1. ArrayBlockingQueue,佇列是有界的,基於陣列實現的阻塞佇列
  2. LinkedBlockingQueue,佇列可以有界,也可以無界。基於連結串列實現的阻塞佇列
  3. SynchronousQueue,不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作將一直處於阻塞狀態。該佇列也是Executors.newCachedThreadPool()的預設佇列
  4. PriorityBlockingQueue,帶優先順序的無界阻塞佇列

通常情況下,我們需要指定阻塞佇列的上界(比如1024)。另外,如果執行的任務很多,我們可能需要將任務進行分類,然後將不同分類的任務放到不同的執行緒池中執行。

(二)執行緒工廠-threadFactory

ThreadFactory是一個介面,只有一個方法。既然是執行緒工廠,那麼我們就可以用它生產一個執行緒物件。來看看這個介面的定義。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

Executors的實現使用了預設的執行緒工廠-DefaultThreadFactory。它的實現主要用於建立一個執行緒,執行緒的名字為pool-{poolNum}-thread-{threadNum}

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

很多時候,我們需要自定義執行緒名字。我們只需要自己實現ThreadFactory,用於建立特定場景的執行緒即可。

(三)拒絕策略-handler

所謂拒絕策略,就是當執行緒池滿了、佇列也滿了的時候,我們對任務採取的措施。或者丟棄、或者執行、或者其他...

jdk自帶4種拒絕策略,我們來看看。

  1. CallerRunsPolicy // 在呼叫者執行緒執行
  2. AbortPolicy // 直接丟擲RejectedExecutionException異常
  3. DiscardPolicy // 任務直接丟棄,不做任何處理
  4. DiscardOldestPolicy // 丟棄佇列裡最舊的那個任務,再嘗試執行當前任務

這四種策略各有優劣,比較常用的是DiscardPolicy,但是這種策略有一個弊端就是任務執行的軌跡不會被記錄下來。所以,我們往往需要實現自定義的拒絕策略, 通過實現RejectedExecutionHandler介面的方式。

五、其它

配置執行緒池的引數

前面我們講到了手動建立執行緒池涉及到的幾個引數,那麼我們要如何設定這些引數才算是正確的應用呢?實際上,需要根據任務的特性來分析。

  1. 任務的性質:CPU密集型、IO密集型和混雜型
  2. 任務的優先順序:高中低
  3. 任務執行的時間:長中短
  4. 任務的依賴性:是否依賴資料庫或者其他系統資源

不同的性質的任務,我們採取的配置將有所不同。在《Java併發程式設計實踐》中有相應的計算公式。

通常來說,如果任務屬於CPU密集型,那麼我們可以將執行緒池數量設定成CPU的個數,以減少執行緒切換帶來的開銷。如果任務屬於IO密集型,我們可以將執行緒池數量設定得更多一些,比如CPU個數*2。

PS:我們可以通過Runtime.getRuntime().availableProcessors()來獲取CPU的個數。

執行緒池監控

如果系統中大量用到了執行緒池,那麼我們有必要對執行緒池進行監控。利用監控,我們能在問題出現前提前感知到,也可以根據監控資訊來定位可能出現的問題。

那麼我們可以監控哪些資訊?又有哪些方法可用於我們的擴充套件支援呢?

首先,ThreadPoolExecutor自帶了一些方法。

  1. long getTaskCount(),獲取已經執行或正在執行的任務數
  2. long getCompletedTaskCount(),獲取已經執行的任務數
  3. int getLargestPoolSize(),獲取執行緒池曾經建立過的最大執行緒數,根據這個引數,我們可以知道執行緒池是否滿過
  4. int getPoolSize(),獲取執行緒池執行緒數
  5. int getActiveCount(),獲取活躍執行緒數(正在執行任務的執行緒數)

其次,ThreadPoolExecutor留給我們自行處理的方法有3個,它在ThreadPoolExecutor中為空實現(也就是什麼都不做)。

  1. protected void beforeExecute(Thread t, Runnable r) // 任務執行前被呼叫
  2. protected void afterExecute(Runnable r, Throwable t) // 任務執行後被呼叫
  3. protected void terminated() // 執行緒池結束後被呼叫

六、總結

  1. 儘量使用手動的方式建立執行緒池,避免使用Executors工廠類
  2. 根據場景,合理設定執行緒池的各個引數,包括執行緒池數量、佇列、執行緒工廠和拒絕策略