1. 程式人生 > >淺析Java執行緒池

淺析Java執行緒池

在Java裡,執行緒池的運用場景很多,幾乎所有需要非同步或者併發執行任務的程式都可以使用執行緒池。

目錄

執行緒池的好處

執行緒池的工作流程

執行緒池的組成元素

向執行緒池提交任務

關閉執行緒池

Java自帶的四種執行緒池


執行緒池的好處

  • 降低資源消耗。通過重複利用已建立的執行緒降低了建立執行緒和銷燬執行緒的資源消耗

  • 提高了響應速度。當有任務時,任務可以不用等執行緒的建立就可以執行

  • 提高了執行緒的可管理性。

執行緒池的工作流程

當你向執行緒池提交了一個任務後,執行緒池判斷核心執行緒池裡的執行緒是否都在執行任務,如果不是,則建立一個新的執行緒去執行這個任務。如果核心執行緒都在執行任務再判斷工作佇列是否已滿,如果沒滿,就把任務儲存在工作佇列裡。如果工作佇列已滿,就判斷執行緒池的執行緒是否已滿,如果滿了則執行飽和拒絕策略,如果沒滿則建立新的執行緒去完成任務。

執行緒池的組成元素

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

當你需要建立一個執行緒池的時候需要上面的幾個引數

corePoolSize:核心執行緒池的大小也可以理解為執行緒池的基本大小,當你提交一個任務給執行緒池的時候,執行緒池就會建立一個執行緒來執行任務,即使其他空閒的核心執行緒能夠執行任務也會去建立新的核心執行緒去執行,等到完成預熱即需要執行的任務數大於執行緒池基本大小的時候就不會再建立了。

maximumPoolSize:執行緒池所允許建立的最大執行緒數,如果工作佇列滿了,並且建立的執行緒數小於最大執行緒數,執行緒池就會再建立新的執行緒執行任務,如果工作佇列是無界(LinkedBlockingQueue),那麼這個引數就沒有什麼實際效果了。

keepAliveTime:執行緒池的工作執行緒空閒後,保持存活的時間,當任務比較多且每個任務的執行時間比較短的時候,可以將時間調大,以此提高執行緒的利用率。

TimeUnit :執行緒存活時間的單位。

workQueue:工作佇列,用來儲存等待執行的任務的阻塞佇列。

  • ArrayBlockingQueue
    :這是一個基於陣列結構的有界阻塞佇列,按FIFO方式對元素進行排序,
  • LinkedBlockingQueue:這是一個基於連結串列結構的阻塞佇列,無界。
  • SynchronousQueue:這是一個不儲存元素的阻塞佇列,相當於每洗一個碗就要將其烘乾,不會洗完後放在碗架上再去烘乾
  • PriorityBlockingQueue:這是一個帶有優先順序的無限阻塞佇列。

ThreadFactory :設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定名字

RejectedExecutionHandler:飽和拒絕策略,當佇列與執行緒池都滿了,就說明執行緒池處於一種飽和狀態,就需要採用一種策略來處理提交的新任務。一般預設是AbortPolicy。

  • AbortPolicy:直接丟擲異常。
  • CallerRunsPolicy:只用呼叫者所在的執行緒執行任務。
  • DiscardOldestPolicy:丟棄佇列中最老的一個任務,然後執行當前任務。
  • DiscardPolicy:不處理,直接丟掉。 

向執行緒池提交任務

向執行緒池提交任務有兩個方法,execute()與submit();

execute()方法用於提交無返回值的任務,所以你無法去判斷這個任務是否被執行緒池執行成功

submit()方法用於提交有返回值的任務,執行緒池會返回一個future型別的物件。可以用這個物件去判斷任務是否執行成功。可以通過future的get()方法來獲取返回的值。


關閉執行緒池

關閉執行緒池有兩種方式,shutdown()與shutdownNow()去關閉執行緒池。它們都是通過遍歷執行緒池中的工作執行緒,然後呼叫執行緒的interrupt()方法來中斷執行緒,所以無法響應中斷的執行緒將會無法終止。shutdownNow是先將執行緒池的狀態設定為STOP,然後再去嘗試停止每一個正在執行或者暫停的執行緒,並且返回等待任務的列表。而shutdown是將執行緒池的狀態設定為SHUTDOWN狀態,然後中斷所有沒有正在執行的的執行緒。


Java自帶的四種執行緒池

Java裡推薦裡4種執行緒池,分別是:FixedThreadPool、SingleThreadExecutor、CachedThreadPool、ScheduledThreadPoolExecutor。

  • FixedThreadPool被稱為可重用固定執行緒數的執行緒池,它的corePoolSize與maximumPoolSize均被設定為建立執行緒池時指定的引數nThreads.當執行緒池的執行緒數大於corePoolSize時,keepAliveTime被設定為0,這意味著有多餘的空閒執行緒將會被禁止。

執行過程:如果當前的執行執行緒數<corePoolSize,就建立新執行緒來執行任務,當前執行的執行緒數=corePoolSize時,就將任務加入LinkedBlockingQueue,核心執行緒完成正在執行的任務後,會在迴圈中,不斷地在LinkedBlockingQueue裡去獲取任務來執行。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

當執行緒池中的執行緒數達到corePoolSize時,新的任務就會在無界佇列中等待,因此執行緒池中的執行緒數不會超過corePoolSize。因為,此處的工作佇列是無界佇列,所以maximumPoolSize與keepAliveTime將會被無效化,而且執行中的FixedThreadPool在不呼叫shutdown()與shutdownNow()時,將不會拒絕任務。


  • SingleThreadExecutor是使用單個工作執行緒的Executor,它的corePoolSize與maximumPoolSize均被設定為1,它也是使用LinkedBlockingQueue作為工作佇列。

執行過程:如果當前執行的執行緒的數量<corePoolSize,就建立新執行緒來執行任務,前執行的執行緒數=corePoolSize時,就將任務加入LinkedBlockingQueue,核心執行緒完成正在執行的任務後,會在迴圈中,不斷地在LinkedBlockingQueue裡去獲取任務來執行。

 

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

  •  CachedThreadPool是一個會根據需要來建立新執行緒的執行緒池,它的corePoolSize被設定為0,maximumPoolSize設定為Integer.MAX_VALUE,可以理解為maximumPool是無界的,它的工作佇列是沒有容量的SynchronousQueue,這意味著主執行緒提交任務的速度高於maximumPool中執行緒的處理速度時,它會不斷地去建立新的執行緒,在極端情況下,它會因為建立過多的執行緒而耗盡CPU的資源

執行過程:首先執行SynchronousQueue.offer(Runnable task),如果當前的maximumPool中有空閒的執行緒正在執行SynchronousQueue.poll方法,那麼主執行緒執行的offer操作與空閒執行緒的poll操作配對成功,主執行緒把任務交給空閒執行緒去執行,如果maximumPool為空,或者當前沒有空閒執行緒的時候,CachedThreadPool會建立一個新的執行緒執行任務,當這個新執行緒建立完畢後,會執行SynchronousQueue.poll方法,這個poll操作會讓空閒執行緒去等待主執行緒的offer操作,如果60s之內,主執行緒給了offer操作,就會配對成功,如果沒給的話,這個空閒執行緒會被終止。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

  • ScheduledThreadPoolExecutor主要用來在給定的延遲之後執行任務,或者定期的執行任務。它與前面的三個執行緒池不太一樣,前面三個執行緒池是通過ThreadPoolExecutor工廠類來建立的,而ScheduledThreadPoolExecutor是通過Executor工廠類來建立的,它適用於需要多個後臺執行緒來執行週期性任務,它裡面的佇列是DelayQueue,DelayQueue是由優先順序佇列實現的無界阻塞佇列,支援延遲獲取元素,因為工作佇列是無序的,所以它的maximumPoolSize值為無效的。

執行過程:從DelayQueue中獲取已經到期的ScheduledFutureTask,再執行這個任務,執行完畢後將這個任務的time改為下一次要執行的時間,再放回DelayQueue