1. 程式人生 > >【多執行緒】執行緒池

【多執行緒】執行緒池

1、好處

第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源, 還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控。

2、實現原理

當提交一個新任務到執行緒池時,執行緒池的處理流程:

1)執行緒池判斷核心執行緒池裡的執行緒是否都在執行任務。如果不是,則建立一個新的工作 執行緒來執行任務。如果核心執行緒池裡的執行緒都在執行任務,則進入下個流程。

2)執行緒池判斷工作佇列是否已經滿。如果工作佇列沒有滿,則將新提交的任務儲存在這 個工作佇列裡。如果工作佇列滿了,則進入下個流程。

3)執行緒池判斷執行緒池的執行緒是否都處於工作狀態。如果沒有,則建立一個新的工作執行緒 來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

ThreadPoolExecutor執行execute方法分下面4種情況。

1)如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟 需要獲取全域性鎖)。

2)如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。

3)如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執 行這一步驟需要獲取全域性鎖)。

4)如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫 RejectedExecutionHandler.rejectedExecution()方法。

原始碼:

  public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
  // 如果執行緒數小於基本執行緒數,則建立執行緒並執行當前任務
  if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
      
  // 如執行緒數大於等於基本執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。
  if (runState == RUNNING && workQueue.offer(command)) {
          if (runState != RUNNING || poolSize == 0)
              ensureQueuedTaskHandled(command);
  }
  // 如果執行緒池不處於執行中或任務無法放入佇列,並且當前執行緒數量小於最大允許的執行緒數量,
  // 則建立一個執行緒執行任務。
  else if (!addIfUnderMaximumPoolSize(command))
  // 丟擲RejectedExecutionException異常
  reject(command); // is shutdown or saturated
              }
      }

3、ThreadPoolExecutor的構造方法

      public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) {
          if (corePoolSize < 0 ||
              maximumPoolSize <= 0 ||
              maximumPoolSize < corePoolSize ||
              keepAliveTime < 0)
              throw new IllegalArgumentException();
          if (workQueue == null || threadFactory == null || handler == null)
              throw new NullPointerException();
          this.acc = System.getSecurityManager() == null ?
                  null :
                  AccessController.getContext();
          this.corePoolSize = corePoolSize;
          this.maximumPoolSize = maximumPoolSize;
          this.workQueue = workQueue;
          this.keepAliveTime = unit.toNanos(keepAliveTime);
          this.threadFactory = threadFactory;
          this.handler = handler;
      }

引數含義如下:

1、corePoolSize 核心執行緒池大小;2、maximumPoolSize 執行緒池最大容量大小;3、keepAliveTime 執行緒池空閒時,執行緒存活的時間;4、TimeUnit 時間單位;5、ThreadFactory 執行緒工廠;6、BlockingQueue 任務佇列;7、RejectedExecutionHandler 執行緒拒絕策略;

4、任務佇列

當我們提交一個新的任務到執行緒池,執行緒池會根據當前池中正在執行的執行緒數量來決定該任務的處理方式。處理方式有三種: 1、直接切換(SynchronusQueue)

2、無界佇列(LinkedBlockingQueue)能夠建立的最大執行緒數為corePoolSize,這時maximumPoolSize就不會起作用了。當執行緒池中所有的核心執行緒都是執行狀態的時候,新的任務提交就會放入等待佇列中。

3、有界佇列(ArrayBlockingQueue)最大maximumPoolSize,能夠降低資源消耗,但是這種方式使得執行緒池對執行緒排程變的更困難。因為執行緒池與佇列容量都是有限的。所以想讓執行緒池的吞吐率和處理任務達到一個合理的範圍,又想使我們的執行緒排程相對簡單,並且還儘可能降低資源的消耗,我們就需要合理的限制這兩個數量

分配技巧: 如果想降低資源的消耗包括降低cpu使用率、作業系統資源的消耗、上下文切換的開銷等等,可以設定一個較大的佇列容量和較小的執行緒池容量,這樣會降低執行緒池的吞吐量。如果我們提交的任務經常發生阻塞,我們可以調整maximumPoolSize。如果我們的佇列容量較小,我們需要把執行緒池大小設定的大一些,這樣cpu的使用率相對來說會高一些。但是如果執行緒池的容量設定的過大,提高任務的數量過多的時候,併發量會增加,那麼執行緒之間的排程就是一個需要考慮的問題。這樣反而可能會降低處理任務的吞吐量。

5、RejectedExecutionHandler 執行緒拒絕策略:

  • AbortPolicy:直接丟擲異常。該策略是執行緒池的預設策略。使用該策略時,如果執行緒池佇列滿了丟掉這個任務並且丟擲RejectedExecutionException異常。

  • DiscardPolicy:不處理,丟棄掉。 這個策略和AbortPolicy的slient版本,如果執行緒池佇列滿了,會直接丟掉這個任務並且不會有任何異常。

  • DiscardOldestPolicy:丟棄佇列裡最近的一個任務, 並執行任務。 丟棄最老的。也就是說如果佇列滿了,會將最早進入佇列的任務刪掉騰出空間,再嘗試加入佇列。因為佇列是隊尾進,隊頭出,所以隊頭元素是最老的,因此每次都是移除對頭元素後再嘗試入隊。

  • CallerRunsPolicy:只用呼叫者所線上程來執行任務。 如果新增到執行緒池失敗,那麼主執行緒會自己去執行該任務,不會等待執行緒池中的執行緒去執行。

  • 自定義:實現RejectedExecutionHandler介面,並且實現rejectedExecution方法就可以了

6、種類

newCachedThreadPool:

      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
  • 底層:返回ThreadPoolExecutor例項,corePoolSize為0;maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為60L;unit為TimeUnit.SECONDS;workQueue為SynchronousQueue(同步佇列)

  • 通俗:當有新任務到來,則插入到SynchronousQueue中,由於SynchronousQueue是同步佇列,因此會在池中尋找可用執行緒來執行,若有可以執行緒則執行,若沒有可用執行緒則建立一個執行緒來執行該任務;若池中執行緒空閒時間超過指定大小,則該執行緒會被銷燬。

  • 適用:執行很多短期非同步的小程式或者負載較輕的伺服器

newFixedThreadPool:

      public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
  • 底層:返回ThreadPoolExecutor例項,接收引數為所設定執行緒數量nThread,corePoolSize為nThread,maximumPoolSize為nThread;keepAliveTime為0L(不限時);unit為:TimeUnit.MILLISECONDS;WorkQueue為:new LinkedBlockingQueue<Runnable>() 無解阻塞佇列

  • 通俗:建立可容納固定數量執行緒的池子,每隔執行緒的存活時間是無限的,當池子滿了就不在新增執行緒了;如果池中的所有執行緒均在繁忙狀態,對於新任務會進入阻塞佇列中(無界的阻塞佇列)

  • 適用:執行長期的任務,效能好很多

newSingleThreadExecutor:

      public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
  • 底層:FinalizableDelegatedExecutorService包裝的ThreadPoolExecutor例項,corePoolSize為1;maximumPoolSize為1;keepAliveTime為0L;unit為:TimeUnit.MILLISECONDS;workQueue為:new LinkedBlockingQueue<Runnable>() 無解阻塞佇列

  • 通俗:建立只有一個執行緒的執行緒池,且執行緒的存活時間是無限的;當該執行緒正繁忙時,對於新任務會進入阻塞佇列中(無界的阻塞佇列)

  • 適用:一個任務一個任務執行的場景

NewScheduledThreadPool:

  //原始碼:
  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
      return new ScheduledThreadPoolExecutor(corePoolSize);
  }
  ​
  public ScheduledThreadPoolExecutor(int corePoolSize) {
      super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,//此處super指的是ThreadPoolExecutor
            new DelayedWorkQueue());
  }
  • 底層:建立ScheduledThreadPoolExecutor例項,corePoolSize為傳遞來的引數,maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為0;unit為:TimeUnit.NANOSECONDS;workQueue為:new DelayedWorkQueue() 一個按超時時間升序排序的佇列

  • 通俗:建立一個固定大小的執行緒池,執行緒池內執行緒存活時間無限制,執行緒池可以支援定時及週期性任務執行,如果所有執行緒均處於繁忙狀態,對於新任務會進入DelayedWorkQueue佇列中,這是一種按照超時時間排序的佇列結構

  • 適用:週期性執行任務的場景

7、實現專案應用

基礎資料Excel匯入多表操作;

場景說明:

Excel匯入基礎資料,得到的匯入資料list需要插入到A表、B表。由於基礎資料動輒上萬條資料,如果只是使用單執行緒進行匯入,時間週期長,系統性能會大大降低。解決方法:使用使用自定義執行緒池,設定了多執行緒,每個執行緒對list進行分組,分組後依次插入到資料庫A、B表中。