1. 程式人生 > 其它 >執行緒池詳解

執行緒池詳解

簡述

  ThreadPoolExecutor是java執行緒池的一種

底層 

  ThreadPoolExecutor 使用 int 的變數ctl的高 3 位來表示執行緒池狀態,低29位表示執行緒數量

  這些資訊儲存在一個原子變數 ctl 中,目的是將執行緒池狀態與執行緒個數合二為一,這樣就可以用一次 cas 原子操作進行賦值

構造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue
<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

  corePoolSize 核心執行緒數目 (最多保留的執行緒數)

  maximumPoolSize 最大執行緒數目

  keepAliveTime 生存時間 - 針對救急執行緒

  unit 時間單位 - 針對救急執行緒

  workQueue 阻塞佇列

  threadFactory 執行緒工廠 - 可以為執行緒建立時起個好名字

  handler 拒絕策略類

核心執行緒與救急執行緒

  當核心執行緒都在執行任務時,新來的任務會進入阻塞佇列,等待執行緒執行。當阻塞佇列滿了之後,後面再來的任務就會嘗試尋找救急執行緒執行。當救急執行緒也都在執行任務時,執行緒池才會啟動拒絕策略。當然救急執行緒平常是用不到的,所以我們設定keepAliveTime救急執行緒的生存時間來定時地回收執行緒。

拒絕策略

  JDK的執行緒池有四種拒絕策略:

AbortPolicy(預設) 丟擲異常
DiscardOldestPolicy 丟棄佇列中最前面的任務,execute該執行緒。進入佇列
DiscardPolicy 丟棄
CallerRunsPolicy 當前執行緒直接執行run方法

  值得一提的是dubbo在丟擲異常的同時會記錄日誌,netty會新建立一個執行緒執行任務,ActiveMQ會在一定時間內重複嘗試,PinPoint會使用了一個拒絕策略鏈,會逐一嘗試策略鏈中每種拒絕策略

工廠方法建立執行緒池

  JDK Executors 類中提供了眾多工廠方法來建立各種用途的執行緒池

newFixedThreadPool

  這種建立方式的特點是沒有救急執行緒被建立,因此也無需指定超時時間。阻塞佇列是無界的,可以放置任意數量的任務。適用於任務量已知,相對耗時的任務。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

newCachedThreadPool

  核心執行緒數是 0, 最大執行緒數是 Integer.MAX_VALUE,救急執行緒的空閒生存時間是 60s,意味著該執行緒池裡全部都是救急執行緒。該執行緒池的救急執行緒可以無限建立,佇列採用了 SynchronousQueue 實現特點是,它沒有容量,沒有執行緒來取是放不進去的

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

  整個執行緒池表現為執行緒數會根據任務量不斷增長,沒有上限,當任務執行完畢,空閒 1分鐘後釋放執行緒。 適合任務數比較密集,但每個任務執行時間較短的情況

newSingleThreadExecutor

  希望多個任務排隊執行。執行緒數固定為 1,任務數多於 1 時,會放入無界佇列排隊。任務執行完畢,這唯一的執行緒也不會被釋放。自己建立一個單執行緒序列執行任務,如果任務執行失敗而終止那麼沒有任何補救措施,而執行緒池還會新建一個執行緒,保證池的正常工作

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

執行緒池提交任務

// 執行任務
void execute(Runnable command);
// 提交任務 task,用返回值 Future 獲得任務執行結果 <T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
// 提交 tasks 中所有任務,帶超時時間 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,帶超時時間 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

關閉執行緒池 

  使用shutdown方法關閉執行緒池,執行緒池狀態變為 SHUTDOWN,不會接收新任務,但已提交任務會執行完、

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改執行緒池狀態
        advanceRunState(SHUTDOWN);
        // 僅會打斷空閒執行緒
        interruptIdleWorkers();
        onShutdown(); // 擴充套件點 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 嘗試終結(沒有執行的執行緒可以立刻終結,如果還有執行的執行緒也不會等)
    tryTerminate();
}

  使用shutdownNow關閉執行緒池,執行緒池狀態變為 STOP,不會接收新任務,會將佇列中的任務返回,並用 interrupt 的方式中斷正在執行的任務

帶任務排程的執行緒池

  ScheduledExecutorService為我們提供了一個能進行任務排程的執行緒池,構造方法傳入初始執行緒的數量。

  它的schedule能夠以定時器的方式延遲執行一個任務,引數為runnable或callable介面、延遲時間、時間單位

ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
pool.schedule(()->{
    System.out.println("task 1");
},1000, TimeUnit.SECONDS);
pool.shutdown();

  它以固定時間頻率執行任務,使用scheduleAtFixedRate即可,引數為任務、第一次執行的延遲時間、每次執行成功的間隔時間、時間單位。需要注意的是,如果執行時間大於間隔時間,則下一次執行就不會進行間隔,兩個任務就會靠在一起執行。

ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
pool.scheduleAtFixedRate(()->{
    System.out.println("task 2");
},1,1,TimeUnit.SECONDS);
pool.shutdown();

  scheduleWithFixedDelay引數一樣,這個方法就不會考慮執行時間,每兩個任務之間的間隔時間都是恆定的

  另外如果呼叫了執行緒池的結束方法,則會停止定時執行的機制。

cpu密集型運算

  指經常用到cpu的任務,執行緒池執行緒數通常採用 cpu 核數 + 1 能夠實現最優的 CPU 利用率,+1 是保證當執行緒由於頁缺失故障(作業系統)或其它原因導致暫停時,額外的這個執行緒就能頂上去,保證 CPU 時鐘週期不被浪費

I/O 密集型運算

  CPU 不總是處於繁忙狀態,例如,當你執行業務計算時,這時候會使用 CPU 資源,但當你執行 I/O 操作時、遠端RPC 呼叫時,包括進行資料庫操作時,這時候 CPU 就閒下來了,你可以利用多執行緒提高它的利用率。 

  經驗公式如下:
執行緒數 = 核數 * 期望 CPU 利用率 * 總時間(CPU計算時間+等待時間) / CPU 計算時間

執行緒池的異常處理

  線上程池中執行的任務如果出現了異常,執行緒池會怎麼處理呢?答案是不會處理。執行緒池中執行的任務如果出現了異常,則會統一把異常交給afterExecute方法進行處理,但afterExecute方法是個空實現。也就是說,啥也不幹。這也很好理解,設計執行緒池的時候怎麼知道你具體執行的異常怎麼處理呢。

  第一種方法是直接在任務里加trycatch程式碼塊,但這樣子程式碼就會很醜。

  第二種方法是設定每一個執行緒的異常處理類來達到異常處理的效果

ExecutorService pool = Executors.newFixedThreadPool(5);
Runnable task1 = new Runnable(){
    @Override
    public void run() {
        System.out.println("task1");
    }
};
Thread t1 = new Thread(task1);
t1.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("異常處理");
    }
});
pool.execute(t1);

  第三種方法是用submit提交callable任務,用futuretask的get方法返回異常資訊,再進行統一處理

一點一點積累,一點一點蛻變!