1. 程式人生 > >知識梳理之Java併發包相關(java.util.concurrent/Blocking/Concurrent/ThreadPoolExecutors/CopyOnWrite)

知識梳理之Java併發包相關(java.util.concurrent/Blocking/Concurrent/ThreadPoolExecutors/CopyOnWrite)

Java併發包提供了哪些併發工具類?

我們通常所說的併發包也就是 java.util.concurrent 及其子包,集中了 Java 併發的各種基礎工具類,具體主要包括幾個方面:

  • 提供了比 synchronized 更加高階的各種同步結構,包括 CountDownLatch、CyclicBarrier、Semaphore 等,可以實現更加豐富的多執行緒操作,比如利用 Semaphore 作為資源控制器,限制同時進行工作的執行緒數量。

  • 各種執行緒安全的容器,比如最常見的 ConcurrentHashMap、有序的 ConcunrrentSkipListMap,或者通過類似快照機制,實現執行緒安全的動態陣列 CopyOnWriteArrayList 等。

  • 各種併發佇列實現,如各種 BlockedQueue 實現,比較典型的 ArrayBlockingQueue、 SynchorousQueue 或針對特定場景的 PriorityBlockingQueue 等。

  • 強大的 Executor 框架,可以建立各種不同型別的執行緒池,排程任務執行等,絕大部分情況下,不再需要自己從頭實現執行緒池和任務排程器。

java.util.concurrent包基本概念,強一致性和弱一致性

java.util.concurrent 包提供的容器(Queue、List、Set)、Map,從命名上可以大概區分為 Concurrent、CopyOnWrite和 Blocking* 等三類,同樣是執行緒安全容器,可以簡單認為:

Concurrent 型別沒有類似 CopyOnWrite 之類容器相對較重的修改開銷。

但是,凡事都是有代價的,Concurrent 往往提供了較低的遍歷一致性。你可以這樣理解所謂的弱一致性,例如,當利用迭代器遍歷時,如果容器發生修改,迭代器仍然可以繼續進行遍歷。

弱一致性的另外一個體現是,size 等操作準確性是有限的,未必是 100% 準確。與此同時,讀取的效能具有一定的不確定性。

同步容器常見的行為“fast-fail”是強一致性的提現,也就是檢測到容器在遍歷過程中發生了修改,則丟擲 ConcurrentModificationException,不再繼續遍歷。

多執行緒程式設計的目的

  • 利用多執行緒提高程式的擴充套件能力,以達到業務對吞吐量的要求。

  • 協調執行緒間排程、互動,以完成業務邏輯。

  • 執行緒間傳遞資料和狀態,這同樣是實現業務邏輯的需要。

同步結構

  1. CountDownLatch,允許一個或多個執行緒等待某些操作完成。

  2. CyclicBarrier,一種輔助性的同步結構,允許多個執行緒等待到達某個屏障。

  3. Semaphore,Java 版本的訊號量實現。

CountDownLatch 是不可以重置的,所以無法重用;而 CyclicBarrier 則沒有這種限制,可以重用。

CountDownLatch 的基本操作組合是 countDown/await。呼叫 await 的執行緒阻塞等待 countDown 足夠的次數,不管你是在一個執行緒還是多個執行緒裡 countDown,只要次數足夠即可。所以就像 Brain Goetz 說過的,CountDownLatch 操作的是事件。

CyclicBarrier 的基本操作組合,則就是 await,當所有的夥伴(parties)都呼叫了 await,才會繼續進行任務,並自動進行重置。注意,正常情況下,CyclicBarrier 的重置都是自動發生的,如果我們呼叫 reset 方法,但還有執行緒在等待,就會導致等待執行緒被打擾,丟擲 BrokenBarrierException 異常。CyclicBarrier 側重點是執行緒,而不是呼叫事件,它的典型應用場景是用來等待併發執行緒結束

ConcurrentHashMap和ConcurrentSkipListMap

這裡寫圖片描述

如果我們的應用側重於 Map 放入或者獲取的速度,而不在乎順序,大多推薦使用 ConcurrentHashMap,反之則使用 ConcurrentSkipListMap;如果我們需要對大量資料進行非常頻繁地修改,ConcurrentSkipListMap 也可能表現出優勢。

為什麼併發容器裡面沒有 ConcurrentTreeMap 呢?

這是因為 TreeMap 要實現高效的執行緒安全是非常困難的,它的實現基於複雜的紅黑樹。為保證訪問效率,當我們插入或刪除節點時,會移動節點進行平衡操作,這導致在併發場景中難以進行合理粒度的同步。而 SkipList 結構則要相對簡單很多,通過層次結構提高訪問速度,雖然不夠緊湊,空間使用有一定提高(O(nlogn)),但是在增刪元素時執行緒安全的開銷要好很多

CopyOnWriteArraySet 和CopyOnWriteArrayList

關於兩個 CopyOnWrite 容器,其實 CopyOnWriteArraySet 是通過包裝了 CopyOnWriteArrayList 來實現的,所以在學習時,我們可以專注於理解一種。

CopyOnWrite 到底是什麼意思呢?

它的原理是,任何修改操作,如 add、set、remove,都會拷貝原陣列,修改後替換原來的陣列,通過這種防禦性的方式,實現另類的執行緒安全。請看下面的程式碼片段。

public boolean add(E e) {
    synchronized (lock) {
        Object[] elements = getArray();
        int len = elements.length;
           // 拷貝
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
           // 替換
        setArray(newElements);
        return true;
            }
}
final void setArray(Object[] a) {
    array = a;
}

所以這種資料結構,相對比較適合讀多寫少的操作,不然修改的開銷還是非常明顯的。

併發包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什麼區別?

有時候我們把併發包下面的所有容器都習慣叫作併發容器,但是嚴格來講,類似 ConcurrentLinkedQueue 這種“Concurrent*”容器,才是真正代表併發,它們的區別:

  • Concurrent 型別基於 lock-free,在常見的多執行緒訪問場景,一般可以提供較高吞吐量。

  • 而 LinkedBlockingQueue 內部則是基於鎖,並提供了 BlockingQueue 的等待性方法。

BlockingQueue是否有界(Bounded、Unbounded)

ArrayBlockingQueue 是最典型的的有界佇列,其內部以 final 的陣列儲存資料,陣列的大小就決定了佇列的邊界,所以我們在建立 ArrayBlockingQueue 時,都要指定容量,如
public ArrayBlockingQueue(int capacity, boolean fair)
LinkedBlockingQueue,容易被誤解為無邊界,但其實其行為和內部程式碼都是基於有界的邏輯實現的,只不過如果我們沒有在建立佇列時就指定容量,那麼其容量限制就自動被設定為 Integer.MAX_VALUE,成為了無界佇列。

SynchronousQueue,這是一個非常奇葩的佇列實現,每個刪除操作都要等待插入操作,反之每個插入操作也都要等待刪除動作。那麼這個佇列的容量是多少呢?是 1 嗎?其實不是的,其內部容量是 0。

PriorityBlockingQueue 是無邊界的優先佇列,雖然嚴格意義上來講,其大小總歸是要受系統資源影響。

DelayedQueue 和 LinkedTransferQueue 同樣是無邊界的佇列。對於無邊界的佇列,有一個自然的結果,就是 put 操作永遠也不會發生其他 BlockingQueue 的那種等待情況。

以 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue的選擇

根據需求可以從很多方面考量:

考慮應用場景中對佇列邊界的要求。ArrayBlockingQueue 是有明確的容量限制的,而 LinkedBlockingQueue 則取決於我們是否在建立時指定,SynchronousQueue 則乾脆不能快取任何元素。

從空間利用角度,陣列結構的 ArrayBlockingQueue 要比 LinkedBlockingQueue 緊湊,因為其不需要建立所謂節點,但是其初始分配階段就需要一段連續的空間,所以初始記憶體需求更大。

通用場景中,LinkedBlockingQueue 的吞吐量一般優於 ArrayBlockingQueue,因為它實現了更加細粒度的鎖操作。

ArrayBlockingQueue 實現比較簡單,效能更好預測,屬於表現穩定的“選手”。

如果我們需要實現的是兩個執行緒之間接力性(handoff)的場景,按照專欄上一講的例子,你可能會選擇 CountDownLatch,但是SynchronousQueue也是完美符合這種場景的,而且執行緒間協調和資料傳輸統一起來,程式碼更加規範。

可能令人意外的是,很多時候 SynchronousQueue 的效能表現,往往大大超過其他實現,尤其是在佇列元素較小的場景。

Java 併發類庫提供的執行緒池有哪幾種? 分別有什麼特點?

Executors 目前提供了 5 種不同的執行緒池建立配置:

  1. newCachedThreadPool(),它是一種用來處理大量短時間工作任務的執行緒池,具有幾個鮮明特點:它會試圖快取執行緒並重用,當無快取執行緒可用時,就會建立新的工作執行緒;如果執行緒閒置的時間超過 60 秒,則被終止並移出快取;長時間閒置時,這種執行緒池,不會消耗什麼資源。其內部使用 SynchronousQueue 作為工作佇列。

  2. newFixedThreadPool(int nThreads),重用指定數目(nThreads)的執行緒,其背後使用的是無界的工作佇列,任何時候最多有 nThreads 個工作執行緒是活動的。這意味著,如果任務數量超過了活動佇列數目,將在工作佇列中等待空閒執行緒出現;如果有工作執行緒退出,將會有新的工作執行緒被建立,以補足指定的數目 nThreads。

  3. newSingleThreadExecutor(),它建立的是個 ScheduledExecutorService,也就是可以進行定時或週期性的工作排程。工作執行緒數目被限制為 1,所以它保證了所有任務的都是被順序執行,最多會有一個任務處於活動狀態,並且不允許使用者改動執行緒池例項,因此可以避免其改變執行緒數目。

  4. newScheduledThreadPool(int corePoolSize),同樣是 ScheduledExecutorService,區別在於它會保持 corePoolSize 個工作執行緒。

  5. newWorkStealingPool(int parallelism),這是一個經常被人忽略的執行緒池,Java 8 才加入這個建立方法,其內部會構建ForkJoinPool,利用Work-Stealing演算法,並行地處理任務,不保證處理順序。

這裡寫圖片描述

在大多數應用場景下,使用 Executors 提供的 5 個靜態工廠方法就足夠了,但是仍然可能需要直接利用 ThreadPoolExecutor 等建構函式建立,這就要求你對執行緒構造方式有進一步的瞭解,你需要明白執行緒池的設計和結構。

另外,執行緒池這個定義就是個容易讓人誤解的術語,因為 ExecutorService 除了通常意義上“池”的功能,還提供了更全面的執行緒管理、任務提交等方法。

Executor 是一個基礎的介面,其初衷是將任務提交和任務執行細節解耦,這一點可以體會其定義的唯一方法。

void execute(Runnable command);

Executor 的設計是源於 Java 早期執行緒 API 使用的教訓,開發者在實現應用邏輯時,被太多執行緒建立、排程等不相關細節所打擾。就像我們進行 HTTP 通訊,如果還需要自己操作 TCP 握手,開發效率低下,質量也難以保證。

ExecutorService 則更加完善,不僅提供 service 的管理功能,比如 shutdown 等方法,也提供了更加全面的提交任務機制,如返回Future而不是 void 的 submit 方法。注意,這個例子輸入的可是Callable,它解決了 Runnable 無法返回結果的困擾。

<T> Future<T> submit(Callable<T> task);

Java 標準類庫提供了幾種基礎實現,比如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。這些執行緒池的設計特點在於其高度的可調節性和靈活性,以儘量滿足複雜多變的實際應用場景.

ThreadPoolExecutor

Java中對於執行緒池的支援,來自ThreadPoolExecutor。一些應用伺服器也確實是使用的ThreadPoolExecutor來實現執行緒池。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是一個使用執行緒池執行定時任務的類,相較於Java中提供的另一個執行定時任務的類Timer,其主要有如下兩個優點:

  • 使用多執行緒執行任務,不用擔心任務執行時間過長而導致任務相互阻塞的情況,Timer是單執行緒執行的,因而會出現這個問題;

  • 不用擔心任務執行過程中,如果執行緒失活,其會新建執行緒執行任務,Timer類的單執行緒掛掉之後是不會重新建立執行緒執行後續任務的。

除去上述兩個優點外,ScheduledThreadPoolExecutor還提供了非常靈活的API,用於執行任務。其任務的執行策略主要分為兩大類,在一定延遲之後只執行一次某個任務,在一定延遲之後週期性的執行某個任務。

ForkJoinPool

在Java 7中引入了一種新的執行緒池:ForkJoinPool。

它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限佇列來儲存需要執行的任務,而執行緒的數量則是通過建構函式傳入,如果沒有向建構函式中傳入希望的執行緒數量,那麼當前計算機可用的CPU數量會被設定為執行緒數量作為預設值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool需要使用相對少的執行緒來處理大量的任務。比如要對1000萬個資料進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬資料的合併任務。以此類推,對於500萬的資料也會做出同樣的分割處理,到最後會設定一個閾值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。

那麼到最後,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的執行緒無法像任務佇列中再新增一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的執行緒建立新的任務,並掛起當前的任務,此時執行緒就能夠從佇列中選擇子任務執行。