1. 程式人生 > 其它 >java執行緒池拒絕策略使用實踐

java執行緒池拒絕策略使用實踐

執行緒池是開發過程中使用頻率較高的一個併發元件之一,本篇會結合踩刀哥之前的實踐經驗來分享一下執行緒池拒絕策略的真實使用場景,至於執行緒池內部原理只會簡單介紹,有需要的可以自行上網學習。

執行緒池工作機制

這裡用一個例子來描述下執行緒池的工作機制,2015年公司boss創立公司,創立初期公司業務比較少,boss一個人(corePoolSize=1)乾的有條不紊,沒過多久,業務量上來了,他一個人幹不過來,分身乏力,那怎麼辦呢?其實很簡單,排隊唄,就這樣boss將待辦的任務都新增到需求池(BlockingQueue)裡面,boss又開始愉快的工作,但是客戶的耐心終歸有限,過了幾天發現自己交給我們公司的業務還沒完成,客戶一氣之下打電話給boss“我的活你幹完沒有,沒幹的話就停下來(shutdown/shutdownNow

)吧,我找別人了”,這時候boss慌了,流著淚點上一根菸,在網上發了招聘,就這樣幹活的人又多了起來(addWorker),但是員工終歸不是無限的,當活太多的時候,boss還是會拒絕接一些活(RejectedExecutionHandler)。公司在boss的帶領下沉浮五載,本以為2020年可以大幹一場,卻偏偏趕上了新冠,復工日期一拖再拖,客戶需求一少再少,唯獨公司養的員工沒少,這是公司目前最大的開支了。長痛不如短痛,boss們研究了一個政策,如果員工一個月(keepAliveTime=一個月)沒有活幹,那麼就會被辭退(空閒執行緒被清理),一段時間以後不少員工被辭退了,只剩下核心人員。
畫個簡圖幫助理解,如下:

主角登場

之前的鋪墊都是為了引出RejectedExecutionHandler,現在我們來聊聊RejectedExecutionHandler的真實使用場景,先看看RejectedExecutionHandler的定義。


/**
 * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

很顯然它是一個介面,只有一個方法rejectedExecution,當執行緒池拒絕接受任務的時候會呼叫它,RejectedExecutionHandler一般由構造ThreadPoolExecutor物件的時候傳入,如果沒有傳入會預設使用AbortPolicy。

jdk目前已提供四種RejectedExecutionHandler的實現供開發者使用,大多數情況下已夠用,少數情況下使用者可以選擇自定義,jdk提供的四種RejectedExecutionHandler實現如下:

1.AbortPolicy:中止策略,丟擲RejectedExecutionException異常由使用者處理;

2.CallerRunsPolicy:佔用呼叫者的執行緒來執行被拒絕的任務;

3.DiscardOldestPolicy:將最早入佇列的任務丟棄,然後重新提交被拒絕的任務(這裡有可能依然不成功);

4.DiscardPolicy:拋棄策略,簡單的拋棄,和AbortPolicy比較相似,區別是前者對於使用者無感知;

實踐場景之-AbortPolicy

在踩刀哥過往的工作中有這麼一個需求,使用者支付以後給使用者push訊息,這裡就用到了執行緒池來處理這塊業務,虛擬碼如下:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(...);
    try{
      threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {
          //1. 呼叫推送服務push msg
        }
      });
    }catch (Exception RejectedExecutionException){
      //2. 記錄日誌
    }

前面說過,如果構造ThreadPoolExecutor時沒有傳遞RejectedExecutionHandler,jdk預設會使用AbortPolicy,它內部會丟擲RejectedExecutionException,所以呼叫者需要捕獲這個異常做相應的處理,因為當時1.0的需求比較簡單,所以只是簡單了記錄了日誌,後來產品提出對於這種失敗的情況需要做補償,進而引出下面的第二個使用場景。

實踐場景之-自定義RejectedExecutionHandler

前面提到產品希望對於這種被拒絕的push任務需要做補償,具體的補償邏輯為:如果當時被拒絕了,那就每隔2s重試一次,一共重試2次。我當時的處理措施是,如果execute失敗了那就將任務放到redis中,非同步取出重試,程式碼怎麼寫呢,第一版是這麼寫的:


ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(...);
    try{
      threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {
          //1. 呼叫推送服務push msg
        }
      });
    }catch (Exception RejectedExecutionException){
      //2. 將任務新增到redis中
    }
    
    //3 定時任務掃描redis,然後新增到threadPoolExecutor中

看著確實也沒有問題,也能實現功能,但是這種寫法顯得不太優雅,ThreadPoolExecutor對於拒絕處理這塊採用了策略設計模式來優化程式碼,讓邏輯更清晰,而我現在的寫法將任務處理和拒絕處理揉在了一起,違背了原來的設計,所以決定進行改造,改造後如下:


ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( ..., new RejectedExecutionHandler() {
    @Override
      public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
        //1. 記錄日誌
        log.warn...
        //2 將task插入的redis中
        redis.lpush...
       
      }
});

拒絕處理的邏輯被封裝到自定義的拒絕處理器當中,邏輯更清晰,表達能力更強。

實踐場景之-CallerRunsPolicy

之前做過一個http推送平臺,大體工作流程如下:

1.生產者將推送任務插入資料庫中;

2.推送平臺起一個非同步執行緒去獲取待推送任務;

3.將第2步中得到的推送任務丟到執行緒池裡面去推送。

簡單來說就是一個生產者消費者模型,推送的時候發現某些下游的介面響應時間較長,經常將執行緒池佔滿,所以就希望DelayQueuePollingTask這個執行緒能感知到這一情況,當執行緒池滿的時候停止去資料庫獲取待推送任務,所以就將RejectedExecutionHandler設定為CallerRunsPolicy,現在可以達到如下效果:

1.生產者將推送任務插入資料庫中;

2.推送平臺起一個非同步執行緒去獲取待推送任務;

3.將第2步中得到的推送任務丟到執行緒池裡面去推送;

4.執行緒池如果滿就由DelayQueuePollingTask這個Thread自己執行推送任務,這樣就可以停止去資料庫獲取待推送任務,DelayQueuePollingTask也不至於閒著沒事,還可以分擔任務。