1. 程式人生 > >死磕 java執行緒系列之執行緒池深入解析——體系結構

死磕 java執行緒系列之執行緒池深入解析——體系結構

(手機橫屏看原始碼更方便)


注:java原始碼分析部分如無特殊說明均基於 java8 版本。

簡介

Java的執行緒池是塊硬骨頭,對執行緒池的原始碼做深入研究不僅能提高對Java整個併發程式設計的理解,也能提高自己在面試中的表現,增加被錄取的可能性。

本系列將分成很多個章節,本章作為執行緒池的第一章將對整個執行緒池體系做一個總覽。

體系結構

上圖列舉了執行緒池中非常重要的介面和類:

(1)Executor,執行緒池頂級介面;

(2)ExecutorService,執行緒池次級介面,對Executor做了一些擴充套件,增加一些功能;

(3)ScheduledExecutorService,對ExecutorService做了一些擴充套件,增加一些定時任務相關的功能;

(4)AbstractExecutorService,抽象類,運用模板方法設計模式實現了一部分方法;

(5)ThreadPoolExecutor,普通執行緒池類,這也是我們通常所說的執行緒池,包含最基本的一些執行緒池操作相關的方法實現;

(6)ScheduledThreadPoolExecutor,定時任務執行緒池類,用於實現定時任務相關功能;

(7)ForkJoinPool,新型執行緒池類,java7中新增的執行緒池類,基於工作竊取理論實現,運用於大任務拆小任務、任務無限多的場景;

(8)Executors,執行緒池工具類,定義了一些快速實現執行緒池的方法(謹慎使用);

Executor

執行緒池頂級介面,只定義了一個執行無返回值任務的方法。

public interface Executor {
    // 執行無返回值任務【本篇文章由公眾號“彤哥讀原始碼”原創】
    void execute(Runnable command);
}

ExecutorService

執行緒池次級介面,對Executor做了一些擴充套件,主要增加了關閉執行緒池、執行有返回值任務、批量執行任務的方法。

public interface ExecutorService extends Executor {
    // 關閉執行緒池,不再接受新任務,但已經提交的任務會執行完成
    void shutdown();

    // 立即關閉執行緒池,嘗試停止正在執行的任務,未執行的任務將不再執行
    // 被迫停止及未執行的任務將以列表的形式返回
    List<Runnable> shutdownNow();

    // 檢查執行緒池是否已關閉
    boolean isShutdown();

    // 檢查執行緒池是否已終止,只有在shutdown()或shutdownNow()之後呼叫才有可能為true
    boolean isTerminated();
    
    // 在指定時間內執行緒池達到終止狀態了才會返回true
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    // 執行有返回值的任務,任務的返回值為task.call()的結果
    <T> Future<T> submit(Callable<T> task);

    // 執行有返回值的任務,任務的返回值為這裡傳入的result
    // 當然只有當任務執行完成了呼叫get()時才會返回
    <T> Future<T> submit(Runnable task, T result);
    
    // 執行有返回值的任務,任務的返回值為null
    // 當然只有當任務執行完成了呼叫get()時才會返回
    Future<?> submit(Runnable task);

    // 批量執行任務,只有當這些任務都完成了這個方法才會返回
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    // 在指定時間內批量執行任務,未執行完成的任務將被取消
    // 這裡的timeout是所有任務的總時間,不是單個任務的時間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    
    // 返回任意一個已完成任務的執行結果,未執行完成的任務將被取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    // 在指定時間內如果有任務已完成,則返回任意一個已完成任務的執行結果,未執行完成的任務將被取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ScheduledExecutorService

對ExecutorService做了一些擴充套件,增加一些定時任務相關的功能,主要包含兩大類:執行一次,重複多次執行。

public interface ScheduledExecutorService extends ExecutorService {

    // 在指定延時後執行一次
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 在指定延時後執行一次
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
                                           
    // 在指定延時後開始執行,並在之後以指定時間間隔重複執行(間隔不包含任務執行的時間)
    // 相當於之後的延時以任務開始計算【本篇文章由公眾號“彤哥讀原始碼”原創】
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    // 在指定延時後開始執行,並在之後以指定延時重複執行(間隔包含任務執行的時間)
    // 相當於之後的延時以任務結束計算
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

AbstractExecutorService

抽象類,運用模板方法設計模式實現了一部分方法,主要為執行有返回值任務、批量執行任務的方法。

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        // 略...
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 略...
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        // 略...
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        // 略...
    }

}

可以看到,這裡的submit()方法對傳入的任務都包裝成了FutureTask來進行處理,這是什麼東西呢?歡迎關注後面的章節。

ThreadPoolExecutor

普通執行緒池類,這也是我們通常所說的執行緒池,包含最基本的一些執行緒池操作相關的方法實現。

執行緒池的主要實現邏輯都在這裡面,比如執行緒的建立、任務的處理、拒絕策略等,我們後面單獨分析這個類。

ScheduledThreadPoolExecutor

定時任務執行緒池類,用於實現定時任務相關功能,將任務包裝成定時任務,並按照定時策略來執行,我們後面單獨分析這個類。

問題:你知道定時任務執行緒池類使用的是什麼佇列嗎?

ForkJoinPool

新型執行緒池類,java7中新增的執行緒池類,這個執行緒池與Go中的執行緒模型特別類似,都是基於工作竊取理論,特別適合於處理歸併排序這種先分後合的場景。

Executors

執行緒池工具類,定義了一系列快速實現執行緒池的方法——newXXX(),不過阿里手冊是不建議使用這個類來新建執行緒池的,彤哥我並不這麼認為,只要能掌握其原始碼,知道其利敝偶爾還是可以用的,後面我們再來說這個事。

彩蛋

無彩蛋不歡,今天的問題是定時任務執行緒池用的是哪種佇列來實現的?

答:延時佇列。定時任務執行緒池中並沒有直接使用併發集合中的DelayQueue,而是自己又實現了一個DelayedWorkQueue,不過跟DelayQueue的實現原理是一樣的。

延時佇列使用什麼資料結構來實現的呢?

答:堆(DelayQueue中使用的是優先順序佇列,而優先順序佇列使用的堆;DelayedWorkQueue直接使用的堆)。

關於延時佇列、優先順序佇列和堆的相關內容點選下面的連結直達:

死磕 java集合之DelayQueue原始碼分析

死磕 java集合之PriorityQueue原始碼分析

拜託,面試別再問我堆(排序)了!


歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

相關推薦

java執行系列執行深入解析——體系結構

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 簡介 Java的執行緒池是塊硬骨頭,對執行緒池的原始碼做深入研究不僅能提高對Java整個併發程式設計的理解,也能提高自己在面試中的表現,增加被錄取的可能性。 本系列將分成很多個章節,本章作為執行緒池的第一章將對

java concurrent包系列(六)基於AQS解析訊號量Semaphore

Semaphore 之前分析AQS的時候,內部有兩種模式,獨佔模式和共享模式,前面的ReentrantLock都是使用獨佔模式,而Semaphore同樣作為一個基於AQS實現的併發元件,它是基於共享模式實現的,我們先看看它的使用場景 Semaphore共享鎖的基本使用 假設有20個人去銀行櫃面辦理業務,

java執行系列執行模型

(2)執行緒模型有哪些? (3)各語言使用的是哪種執行緒模型? 簡介 在Java中,我們平時所說的併發程式設計、多執行緒、共享資源等概念都是與執行緒相關的,這裡所說的執行緒實際上應該叫作“使用者執行緒”,而對應到作業系統,還有另外一種執行緒叫作“核心執行緒”。 使用者執行緒位於核心之上,它的管理無需核心支援

java執行系列執行深入解析——生命週期

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 上一章我們一起重溫了下執行緒的生命週期(六種狀態還記得不?),但是你知不知道其實執行緒池也是有生命週期的呢?! 問題 (1)

java執行系列執行深入解析——普通任務執行流程

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了Java中執行緒池的體系結構、構造方法和生命週期,本章我們一起來學習執行緒池中普通任務到底是怎麼執行的。

java執行系列執行深入解析——未來任務執行流程

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了執行緒池中普通任務的執行流程,但其實執行緒池中還有一種任務,叫作未來任務(future task),使用它

java執行系列執行深入解析——定時任務執行流程

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:本文基於ScheduledThreadPoolExecutor定時執行緒池類。 簡介 前面我們一起學習了普通任務、未來任務的執行流程,今天我們再來學習一種新的任務——定時任務。 定時任務是我們經常會用到的一

java concurrent包系列(一)從樂觀鎖、悲觀鎖到AtomicInteger的CAS演算法

前言 Java中有各式各樣的鎖,主流的鎖和概念如下: 這篇文章主要是為了讓大家通過樂觀鎖和悲觀鎖出發,理解CAS演算法,因為CAS是整個Concurrent包的基礎。 樂觀鎖和悲觀鎖 首先,java和資料庫中都有這種概念,他是一種從執行緒同步的角度上看的一種廣義上的概念: 悲觀鎖:悲觀的認為自己在使用資料的

java concurrent包系列(三)基於ReentrantLock理解AQS的條件佇列

基於Codition分析AQS的條件佇列 前言 上一篇我們講了AQS中的同步佇列佇列,現在我們研究一下條件佇列。 在java中最常見的加鎖方式就是synchorinzed和Reentrantlock,我們都說Reentrantlock比synchorinzed更加靈活,其實就靈活在Reentrantlock中

java concurrent包系列(五)基於AQS的條件佇列把LinkedBlockingQueue“扒光”

LinkedBlockingQueue的基礎 LinkedBlockingQueue是一個基於連結串列的阻塞佇列,實際使用上與ArrayBlockingQueue完全一樣,我們只需要把之前烤雞的例子中的Queue物件替換一下即可。如果對於ArrayBlockingQueue不熟悉,可以去看看https://

java執行系列建立執行的8種方式

問題 (1)建立執行緒有哪幾種方式? (2)它們分別有什麼運用場景? 簡介 建立執行緒,是多執行緒程式設計中最基本的操作,彤哥總結了一下,大概有8種建立執行緒的方式,你知道嗎? 繼承Thread類並重寫run()方法 public class CreatingThread01 extends Thread

java執行系列自己動手寫一個執行

歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。 (手機橫屏看原始碼更方便) 問題 (1)自己動手寫一個執行緒池需要考慮哪些因素? (2)自己動手寫的執行緒池如何測試? 簡介 執行緒池是Java併發程式設計中經常使用到的技術,那麼自己如何動手寫一個執行緒池呢?本

java執行系列自己動手寫一個執行(續)

(手機橫屏看原始碼更方便) 問題 (1)自己動手寫的執行緒池如何支援帶返回值的任務呢? (2)如果任務執行的過程中丟擲異常了該

java執行系列終篇

(手機橫屏看原始碼更方便) 簡介 執行緒系列我們基本就學完了,這一個系列我們基本都是圍繞著執行緒池在講,其實關於執行緒還有很多東西可以講,後面有機會我們再補充進來。當然,如果你有什麼好的想法,也可以公從號右下角聯絡我。 重要知識點 直接上圖,看著這張圖我相信你能夠回憶起很多東西,也可以看著這張圖來自己提

Java併發】-----J.U.CAQS:阻塞和喚醒執行

此篇部落格所有原始碼均來自JDK 1.8 線上程獲取同步狀態時如果獲取失敗,則加入CLH同步佇列,通過通過自旋的方式不斷獲取同步狀態,但是在自旋的過程中則需要判斷當前執行緒是否需要阻塞,其主要方法在acquireQueued(): if (sho

【紮實基本功】Java基礎教程系列執行

1. 多執行緒的概念 1.1 程序、執行緒、多程序的概念 程序:正在進行中的程式(直譯)。 執行緒是程式執行的一條路徑, 一個程序中可以包含多條執行緒。 一個應用程式可以理解成就是一個程序。 多執行緒併發執行可以提高程式的效率, 可以同時完成多項工作。 1.

Java執行系列--“JUC執行”01 執行架構

概要 前面分別介紹了”Java多執行緒基礎”、”JUC原子類”和”JUC鎖”。本章介紹JUC的最後一部分的內容——執行緒池。內容包括: 執行緒池架構圖 執行緒池示例 執行緒池架構圖 執行緒池的架構圖如下: 1、Executor

Java執行系列--“JUC執行”05 執行原理(四)

概要 本章介紹執行緒池的拒絕策略。內容包括: 拒絕策略介紹 拒絕策略對比和示例 拒絕策略介紹 執行緒池的拒絕策略,是指當任務新增到執行緒池中被拒絕,而採取的處理措施。 當任務新增到執行緒池中之所以被拒絕,可能是由於:第一,執行緒池異常關閉。第二,任務數量

Java 併發程式設計系列帶你瞭解多執行

早期的計算機不包含作業系統,它們從頭到尾執行一個程式,這個程式可以訪問計算機中的所有資源。在這種情況下,每次都只能執行一個程式,對於昂貴的計算機資源來說是一種嚴重的浪費。 作業系統出現後,計算機可以執行多個程式,不同的程式在單獨的程序中執行。作業系統負責為各個獨

java執行系列模式|第一篇-Guarded Suspension pattern

Guarded Suspension pattern模式 作者注:該系列文章基於《java執行緒設計模式》撰寫,只用於學習和交流。 定義:多執行緒執行,當前執行緒沒有達到警戒條件時,執行緒會進入等待直到