1. 程式人生 > 程式設計 >Java執行緒池ThreadPoolExecutor原理及使用例項

Java執行緒池ThreadPoolExecutor原理及使用例項

引導

要求:執行緒資源必須通過執行緒池提供,不允許在應用自行顯式建立執行緒;

說明:使用執行緒池的好處是減少在建立和銷燬執行緒上所花的時間以及系統資源的開銷,解決資源不足的問題。如果不使用執行緒池,有可能造成系統建立大量同類執行緒而導致消耗記憶體或者“過度切換”的問題。

執行緒池介紹執行緒池概述

  •   執行緒池,顧名思義是一個放著執行緒的池子,這個池子的執行緒主要是用來執行任務的。當用戶提交任務時,執行緒池會建立執行緒去執行任務,若任務超過了核心執行緒數的時候,會在一個任務佇列裡進行排隊等待,這個詳細流程,我們會後面細講。
  •   任務,通常是一些抽象的且離散的工作單元,我們會把應用程式的工作分解到多個任務中去執行。一般我們需要使用多執行緒執行任務的時候,這些任務最好都是相互獨立的,這樣有一定的任務邊界供程式把控。
  •   多執行緒,當使用多執行緒的時候,任務處理過程就可以從主執行緒中剝離出來,任務可以並行處理,同時處理多個請求。當然了,任務處理程式碼必須是執行緒安全的。

為何要使用執行緒池?

降低開銷:在建立和銷燬執行緒的時候會產生很大的系統開銷,頻繁建立/銷燬意味著CPU資源的頻繁切換和佔用,執行緒是屬於稀缺資源,不可以頻繁的建立。假設建立執行緒的時長記為t1,執行緒執行任務的時長記為t2,銷燬執行緒的時長記為t3,如果我們執行任務t2<t1+t3,那麼這樣的開銷是不划算的,不使用執行緒池去避免建立和銷燬的開銷,將是極大的資源浪費。

易複用和管理:將執行緒都放在一個池子裡,便於統一管理(可以延時執行,可以統一命名執行緒名稱等),同時,也便於任務進行復用。

解耦:將執行緒的建立和銷燬與執行任務完全分離出來,這樣方便於我們進行維護,也讓我們更專注於業務開發。執行緒池的優勢提高資源的利用性:通過池化可以重複利用已建立的執行緒,空閒執行緒可以處理新提交的任務,從而降低了建立和銷燬執行緒的資源開銷。提高執行緒的管理性:在一個執行緒池中管理執行任務的執行緒,對執行緒可以進行統一的建立、銷燬以及監控等,對執行緒數做控制,防止執行緒的無限制建立,避免執行緒數量的急劇上升而導致CPU過度排程等問題,從而更合理的分配和使用核心資源。提高程式的響應性:提交任務後,有空閒執行緒可以直接去執行任務,無需新建。提高系統的可擴充套件性:利用執行緒池可以更好的擴充套件一些功能,比如定時執行緒池可以實現系統的定時任務。執行緒池原理執行緒池的引數型別

一共有7個:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler,(5+2,前5個重要)

int corePoolSize:該執行緒池中核心執行緒數最大值

這邊我們區分兩個概念:

核心執行緒:執行緒池新建執行緒的時候,當前執行緒總數< corePoolSize,新建的執行緒即為核心執行緒。非核心執行緒:執行緒池新建執行緒的時候,當前執行緒總數< corePoolSize,新建的執行緒即為核心執行緒。

核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒不工作(空閒狀態),除非ThreadPoolExecutor 的 allowCoreThreadTimeOut這個屬性為 true,那麼核心執行緒如果空閒狀態下,超過一定時間後就被銷燬。

int maximumPoolSize:執行緒總數最大值

執行緒總數 = 核心執行緒數 + 非核心執行緒數

long keepAliveTime:非核心執行緒空閒超時時間

  keepAliveTime即為空閒執行緒允許的最大的存活時間。如果一個非核心執行緒空閒狀態的時長超過keepAliveTime了,就會被銷燬掉。注意:如果設定allowCoreThreadTimeOut = true,就變成核心執行緒超時銷燬了。

TimeUnit unit:是keepAliveTime 的單位

TimeUnit 是一個列舉型別,列舉如下:

單位

單位 說明
NANOSECONDS 1微毫秒 = 1微秒 / 1000
MICROSECONDS 1微秒 = 1毫秒 / 1000
MILLISECONDS 1毫秒 = 1秒 /1000
SECONDS
MINUTES
HOURS 小時
DAYS

BlockingQueue workQueue:存放任務的阻塞佇列

  當核心執行緒都在工作的時候,新提交的任務就會被新增到這個工作阻塞佇列中進行排隊等待;如果阻塞佇列也滿了,執行緒池就新建非核心執行緒去執行任務。workQueue維護的是等待執行的Runnable物件。
常用的 workQueue 型別:(無界佇列、有界佇列、同步移交佇列)

SynchronousQueue:同步移交佇列,適用於非常大的或者無界的執行緒池,可以避免任務排隊,SynchronousQueue佇列接收到任務後,會直接將任務從生產者移交給工作者執行緒,這種移交機制高效。它是一種不儲存元素的佇列,任務不會先放到佇列中去等執行緒來取,而是直接移交給執行的執行緒。只有當執行緒池是無界的或可以拒絕任務的時候,SynchronousQueue佇列的使用才有意義,maximumPoolSize 一般指定成 Integer.MAX_VALUE,即無限大。要將一個元素放入SynchronousQueue,就需要有另一個執行緒在等待接收這個元素。若沒有執行緒在等待,並且執行緒池的當前執行緒數小於最大值,則ThreadPoolExecutor就會新建一個執行緒;否則,根據飽和策略,拒絕任務。newCachedThreadPool預設使用的就是這種同步移交佇列。吞吐量高於LinkedBlockingQueue。

LinkedBlockingQueue:基於連結串列結構的阻塞佇列,FIFO原則排序。當任務提交過來,若當前執行緒數小於corePoolSize核心執行緒數,則執行緒池新建核心執行緒去執行任務;若當前執行緒數等於corePoolSize核心執行緒數,則進入工作佇列進行等待。LinkedBlockingQueue佇列沒有最大值限制,只要任務數超過核心執行緒數,都會被新增到佇列中,這就會導致匯流排程數永遠不會超過 corePoolSize,所以maximumPoolSize 是一個無效設定。newFixedThreadPool和newSingleThreadPool預設是使用的是無界LinkedBlockingQueue佇列。吞吐量高於ArrayBlockingQueue。

ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,可以設定佇列上限值,FIFO原則排序。當任務提交時,若當前執行緒小於corePoolSize核心執行緒數,則新建核心執行緒執行任務;若當先執行緒數等於corePoolSize核心執行緒數,則進入佇列排隊等候;若佇列的任務數也排滿了,則新建非核心執行緒執行任務;若佇列滿了且匯流排程數達到了maximumPoolSize最大執行緒數,則根據飽和策略進行任務的拒絕。

DelayQueue:延遲佇列,佇列內的元素必須實現 Delayed 介面。當任務提交時,入佇列後只有達到指定的延時時間,才會執行任務

PriorityBlockingQueue:優先順序阻塞佇列,根據優先順序執行任務,優先順序是通過自然排序或者是Comparator定義實現。
注意: 只有當任務相互獨立沒有任何依賴的時候,執行緒池或工作佇列設定有界是合理的;若任務之間存在依賴性,需要使用無界的

執行緒池,如newCachedThreadPool,否則有可能會導致死鎖問題。

ThreadFactory threadFactory

  建立執行緒的方式,這是一個介面,你 new 他的時候需要實現他的 Thread newThread(Runnable r) 方法,一般用不上,

RejectedExecutionHandler handler:飽和策略
丟擲異常專用,當佇列和最大執行緒池都滿了之後的飽和策略。

執行緒池工作流程

一般流程即為:建立worker執行緒;新增任務入workQueue佇列;worker執行緒執行任務。

Java執行緒池ThreadPoolExecutor原理及使用例項

當一個任務被新增進執行緒池時:

1.當前執行緒數量未達到 corePoolSize,則新建一個執行緒(核心執行緒)執行任務

2.當前執行緒數量達到了 corePoolSize,則將任務移入阻塞佇列等待,讓空閒執行緒處理;

3.當阻塞佇列已滿,新建執行緒(非核心執行緒)執行任務

4.當阻塞佇列已滿,匯流排程數又達到了 maximumPoolSize,就會按照拒絕策略處理無法執行的任務,比如RejectedExecutionHandler丟擲異常。

這邊,為了大家能夠更好的去理解這塊的流程,我們舉一個例子。生活中我們經常會去打一些公司的諮詢電話或者是一些特定機構的投訴電話,而那個公司或者機構的客服中心就是一個執行緒池,正式員工的客服小姐姐就好比是核心執行緒,比如有6個客服小姐姐。

5. 當用戶的電話打進到公司的客服中心的時候(提交任務);

6. 客服中心會排程客服小姐姐去接聽電話(建立執行緒執行任務),如果接聽的電話超過了6個,6個客服小姐姐都在接聽的工作狀態了(核心執行緒池滿了),這時客服中心會有一個電話接聽等待通道(進入任務佇列等待),就是我們經常聽到的“您的通話在排隊,前面排隊n人。”

7. 當然,這個電話接聽等待通道也是有上限的,當超過這個上限的時候(任務佇列滿了),客服中心就會立即安排外協員工(非核心執行緒),也就是非正式員工去接聽額外的電話(任務佇列滿了,正式和非正式員工數量>總任務數,執行緒池建立非核心執行緒去執行任務)。

8. 當用戶電話數激增,客服中心控制檯發現這個時候正式員工和外協員工的總和已經滿足不了這些使用者電話接入了(匯流排程池滿),就開始根據一些公司電話接聽規則去拒絕這些電話(按照拒絕策略處理無法執行的任務)

執行緒池狀態

Java執行緒池ThreadPoolExecutor原理及使用例項

RUNNING:執行狀態,指可以接受任務並執行佇列裡的任務。

SHUTDOWN:呼叫了 shutdown() 方法,不再接受新任務,但佇列裡的任務會執行完畢。

STOP:指呼叫了 shutdownNow() 方法,不再接受新任務,所有任務都變成STOP狀態,不管是否正在執行。該操作會拋棄阻塞佇列裡的所有任務並中斷所有正在執行任務。

TIDYING:所有任務都執行完畢,程式呼叫 shutdown()/shutdownNow() 方法都會將執行緒更新為此狀態,若呼叫shutdown(),則等執行任務全部結束,佇列即為空,變成TIDYING狀態;呼叫shutdownNow()方法後,佇列任務清空且正在執行的任務中斷後,更新為TIDYING狀態。

TERMINATED:終止狀態,當執行緒執行 terminated() 後會更新為這個狀態。

執行緒池原始碼

執行緒池核心介面

ThreadPoolExecutor,在java.util.concurrent下。

  /**
   * Creates a new {@code ThreadPoolExecutor} with the given initial
   * parameters.
   *
   * @param corePoolSize the number of threads to keep in the pool,even
   *    if they are idle,unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *    pool
   * @param keepAliveTime when the number of threads is greater than
   *    the core,this is the maximum time that excess idle threads
   *    will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *    executed. This queue will hold only the {@code Runnable}
   *    tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *    creates a new thread
   * @param handler the handler to use when execution is blocked
   *    because the thread bounds and queue capacities are reached
   * @throws IllegalArgumentException if one of the following holds:<br>
   *     {@code corePoolSize < 0}<br>
   *     {@code keepAliveTime < 0}<br>
   *     {@code maximumPoolSize <= 0}<br>
   *     {@code maximumPoolSize < corePoolSize}
   * @throws NullPointerException if {@code workQueue}
   *     or {@code threadFactory} or {@code handler} is null
   */
  public ThreadPoolExecutor(int corePoolSize,//核心執行緒數
               int maximumPoolSize,//最大執行緒數
               long keepAliveTime,//空閒執行緒存活時間
               TimeUnit unit,//存活時間單位
               BlockingQueue<Runnable> workQueue,//任務的阻塞佇列
               ThreadFactory threadFactory,//新執行緒的產生方式
               RejectedExecutionHandler handler //拒絕策略) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
        AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
  }

ThreadPoolExecutor 繼承 AbstractExecutorService;AbstractExecutorService 實現 ExecutorService, ExecutorService 繼承 Executor

Java執行緒池ThreadPoolExecutor原理及使用例項

public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}

執行緒池構造方法

1)5引數構造器

// 5引數構造器
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)

2)6引數構造器-1

// 6引數構造器-1
public ThreadPoolExecutor(int corePoolSize,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)

3)6引數構造器-2

// 6引數構造器-2
public ThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler)

4)7引數構造器

// 7引數構造器
public ThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler)

四種執行緒池

常規用法

//建立固定數目執行緒的執行緒池
Executors.newFixedThreadPool(200);

//建立一個無限執行緒的執行緒池,無需等待佇列,任務提交即執行
Executors.newCachedThreadPool()

//建立有且僅有一個執行緒的執行緒池
Executors.newSingleThreadExecutor();

newCachedThreadPool():可快取執行緒池

介紹

newCachedThreadPool將建立一個可快取的執行緒,如果當前執行緒數超過處理任務時,回收空閒執行緒;當需求增加時,可以新增新執行緒去處理任務。

  • 執行緒數無限制,corePoolSize數值為0, maximumPoolSize 的數值都是為 Integer.MAX_VALUE。
  • 若執行緒未回收,任務到達時,會複用空閒執行緒;若無空閒執行緒,則新建執行緒執行任務。
  • 因為複用性,一定程式減少頻繁建立/銷燬執行緒,減少系統開銷。
  • 工作佇列可以選用SynchronousQueue。

建立方法

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

原始碼

  public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  }

newFixedThreadPool():定長執行緒池

介紹

newFixedThreadPool建立一個固定長度的執行緒池,每次提交一個任務的時候就會建立一個新的執行緒,直到達到執行緒池的最大數量限制。

  • 定長,可以控制執行緒最大併發數, corePoolSize 和 maximumPoolSize 的數值都是nThreads。
  • 超出的執行緒會在佇列中等待。
  • 工作佇列可以選用LinkedBlockingQueue。

建立方法

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);

原始碼

  public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
  }

newScheduledThreadPool():定時執行緒池

介紹

newScheduledThreadPool建立一個固定長度的執行緒池,並且以延遲或者定時的方式去執行任務。

建立方法:

ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

原始碼

  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
  }

  public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize,NANOSECONDS,new DelayedWorkQueue());
  }

newSingleThreadExecutor():單執行緒化的執行緒池

介紹

newSingleThreadExecutor顧名思義,是一個單執行緒的Executor,只建立一個工作執行緒執行任務,若這個唯一的執行緒異常故障了,會新建另一個執行緒來替代,newSingleThreadExecutor可以保證任務依照在工作佇列的排隊順序來序列執行。

  • 有且僅有一個工作執行緒執行任務;
  • 所有任務按照工作佇列的排隊順序執行,先進先出的順序。
  • 單個執行緒的執行緒池就是執行緒池中只有一個執行緒負責任務,所以 corePoolSize 和 maximumPoolSize 的數值都是為 1;當這個執行緒出現任何異常後,執行緒池會自動建立一個執行緒,始終保持執行緒池中有且只有一個存活的執行緒。
  • 工作佇列可以選用LinkedBlockingQueue。

建立方法

ExecutorService singleThreadPool = Executors.newSingleThreadPool();

原始碼

  public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1,1,new LinkedBlockingQueue<Runnable>()));
  }

  static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
      super(executor);
    }
    protected void finalize() {
      super.shutdown();
    }
  }

execute()方法

介紹

ThreadPoolExecutor.execute(Runnable command)方法,即可向執行緒池內新增一個任務

execute原始碼

  /**
   * Executes the given task sometime in the future. The task
   * may execute in a new thread or in an existing pooled thread.
   *
   * If the task cannot be submitted for execution,either because this
   * executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.
   *
   * @param command the task to execute
   * @throws RejectedExecutionException at discretion of
   *     {@code RejectedExecutionHandler},if the task
   *     cannot be accepted for execution
   * @throws NullPointerException if {@code command} is null
   */
  public void execute(Runnable command) {
    if (command == null)
      throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running,try to
     * start a new thread with the given command as its first
     * task. The call to addWorker atomically checks runState and
     * workerCount,and so prevents false alarms that would add
     * threads when it shouldn't,by returning false.
     *
     * 2. If a task can be successfully queued,then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped,or start a new thread if there are none.
     *
     * 3. If we cannot queue task,then we try to add a new
     * thread. If it fails,we know we are shut down or saturated
     * and so reject the task.
     */
	//獲取當前執行緒池的狀態
    int c = ctl.get();
	//若當前執行緒數量小於corePoolSize,則建立一個新的執行緒
    if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command,true))
        return;
      c = ctl.get();
    }
	//判斷當前執行緒是否處於執行狀態,且寫入任務阻塞佇列是否成功
    if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
	//再次獲取執行緒狀態進行雙重檢查;如果執行緒變成非執行狀態,則從阻塞佇列移除任務;
      if (! isRunning(recheck) && remove(command))
	//執行拒絕策略
        reject(command);
	//若當前執行緒池為空,則新建一個執行緒
      else if (workerCountOf(recheck) == 0)
        addWorker(null,false);
    }
	//當前執行緒為非執行狀態並且嘗試新建執行緒,若失敗則執行拒絕策略。
    else if (!addWorker(command,false))
      reject(command);
  }

流程分析

1)若當前執行緒數小於corePoolSize,則呼叫addWorker()方法建立執行緒執行任務。

2)若當前執行緒不小於corePoolSize,則將任務新增到workQueue佇列,等待空閒執行緒來執行。

3)若佇列裡的任務數到達上限,且當前執行執行緒小於maximumPoolSize,任務入workQueue佇列失敗,新建執行緒執行任務;

4)若建立執行緒也失敗(佇列任務數到達上限,且當前執行緒數達到了maximumPoolSize),對於新加入的任務,就會呼叫reject()(內部呼叫handler)拒絕接受任務。

Q&A

兩種關閉執行緒池的區別

shutdown(): 執行後停止接受新任務,會把佇列的任務執行完畢。

shutdownNow(): 執行後停止接受新任務,但會中斷所有的任務(不管是否正在執行中),將執行緒池狀態變為 STOP狀態。

拒絕策略有哪些?
ThreadPoolExecutor的飽和策略可以通過呼叫setRejectedExecutionHandler來修改。JDK提供了幾種不同的

RejectedExecutionHandler實現,每種實現都包含有不同的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。

拒絕策略如下:

  • CallerRunsPolicy : 呼叫執行緒處理任務
  • AbortPolicy : 丟擲異常
  • DiscardPolicy : 直接丟棄
  • DiscardOldestPolicy : 丟棄佇列中最老的任務,執行新任務

RejectedExecutionHandler rejected = null;

//預設策略,阻塞佇列滿,則丟任務、丟擲異常
rejected = new ThreadPoolExecutor.AbortPolicy();

//阻塞佇列滿,則丟任務,不拋異常
rejected = new ThreadPoolExecutor.DiscardPolicy();

//刪除佇列中最舊的任務(最早進入佇列的任務),嘗試重新提交新的任務
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();

//佇列滿,不丟任務,不拋異常,若新增到執行緒池失敗,那麼主執行緒會自己去執行該任務
rejected = new ThreadPoolExecutor.CallerRunsPolicy();

(1)AbortPolicy、DiscardPolicy和DiscardOldestPolicy

  AbortPolicy是預設的飽和策略,就是中止任務,該策略將丟擲RejectedExecutionException。呼叫者可以捕獲這個異常然後去編寫程式碼處理異常。

  當新提交的任務無法儲存到佇列中等待執行時,DiscardPolicy會悄悄的拋棄該任務。

  DiscardOldestPolicy則會拋棄最舊的(下一個將被執行的任務),然後嘗試重新提交新的任務。如果工作佇列是那個優先順序佇列時,搭配DiscardOldestPolicy飽和策略會導致優先順序最高的那個任務被拋棄,所以兩者不要組合使用。

(2)CallerRunsPolicy

  CallerRunsPolicy是“呼叫者執行”策略,實現了一種調節機制 。它不會拋棄任務,也不會丟擲異常。 而是將任務回退到呼叫者。它不會線上程池中執行任務,而是在一個呼叫了execute的執行緒中執行該任務。線上程滿後,新任務將交由呼叫執行緒池execute方法的主執行緒執行,而由於主執行緒在忙碌,所以不會執行accept方法,從而實現了一種平緩的效能降低。

  當工作佇列被填滿後,沒有預定義的飽和策略來阻塞execute(除了拋棄就是中止還有去讓呼叫者去執行)。然而可以通過Semaphore來限制任務的到達率。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。