1. 程式人生 > >Java 併發工具包-java.util.concurrent-原始碼jdk1.7全面解析

Java 併發工具包-java.util.concurrent-原始碼jdk1.7全面解析

先來看看類圖:


其實從類圖我們能發現concurrent包(除去java.util.concurrent.atomic 和 java.util.concurrent.locks)中的內容並沒有特別多,大概分為四類:BlockingQueue阻塞佇列體系、Executor執行緒組執行框架、Future執行緒返回值體系、其他各種單獨的併發工具等。

我們把它分為7個章節來描述:1、併發容器;2、同步裝置;3、執行器和執行緒池;4、Fork-join框架;5、鎖 Lock;6、原子物件;7、其它。

接下來我們分別概述:

1、併發容器

這些容器的關鍵方法大部分都實現了執行緒安全的功能,卻不使用同步關鍵字(synchronized)。值得注意的是Queue介面本身定義的幾個常用方法的區別,

  1. add方法和offer方法的區別在於超出容量限制時前者丟擲異常,後者返回false;
  2. remove方法和poll方法都從佇列中拿掉元素並返回,但是他們的區別在於空佇列下操作前者丟擲異常,而後者返回null;
  3. element方法和peek方法都返回佇列頂端的元素,但是不把元素從佇列中刪掉,區別在於前者在空佇列的時候丟擲異常,後者返回null。
概述:

1.1阻塞佇列

  • BlockingQueue.class,阻塞佇列介面
  • BlockingDeque.class,雙端阻塞佇列介面
  • ArrayBlockingQueue.class,阻塞佇列,陣列實現
  • LinkedBlockingDeque.class,阻塞雙端佇列,連結串列實現
  • LinkedBlockingQueue.class,阻塞佇列,連結串列實現
  • DelayQueue.class,阻塞佇列,並且元素是Delay的子類,保證元素在達到一定時間後才可以取得到
  • PriorityBlockingQueue.class,優先順序阻塞佇列
  • SynchronousQueue.class,同步佇列,但是佇列長度為0,生產者放入佇列的操作會被阻塞,直到消費者過來取,所以這個佇列根本不需要空間存放元素;有點像一個獨木橋,一次只能一人通過,還不能在橋上停留

1.2非阻塞佇列

  • ConcurrentLinkedDeque.class,非阻塞雙端佇列,連結串列實現
  • ConcurrentLinkedQueue.class,非阻塞佇列,連結串列實現

1.3轉移佇列

  • TransferQueue.class,轉移佇列介面,生產者要等消費者消費的佇列,生產者嘗試把元素直接轉移給消費者
  • LinkedTransferQueue.class,轉移佇列的連結串列實現,它比SynchronousQueue更快

1.4其它容器

  • ConcurrentMap.class,併發Map的介面,定義了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)這四個併發場景下特定的方法
  • ConcurrentHashMap.class,併發HashMap
  • ConcurrentNavigableMap.class,NavigableMap的實現類,返回最接近的一個元素
  • ConcurrentSkipListMap.class,它也是NavigableMap的實現類(要求元素之間可以比較),同時它比ConcurrentHashMap更加scalable——ConcurrentHashMap並不保證它的操作時間,並且你可以自己來調整它的load factor;但是ConcurrentSkipListMap可以保證O(log n)的效能,同時不能自己來調整它的併發引數,只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,才去使用它
  • ConcurrentSkipListSet.class,和上面類似,只不過map變成了set
  • CopyOnWriteArrayList.class,copy-on-write模式的array list,每當需要插入元素,不在原list上操作,而是會新建立一個list,適合讀遠遠大於寫並且寫時間並苛刻的場景
  • CopyOnWriteArraySet.class,和上面類似,list變成set而已

1.1、阻塞佇列

  • BlockingQueue.class,阻塞佇列介面

阻塞佇列 BlockingQueue

java.util.concurrent 包裡的 BlockingQueue 介面表示一個執行緒安放入和提取例項的佇列。本小節我將給你演示如何使用這個 BlockingQueue。
本節不會討論如何在 Java 中實現一個你自己的 BlockingQueue。如果你對那個感興趣,參考《Java 併發指南

BlockingQueue 用法

BlockingQueue 通常用於一個執行緒生產物件,而另外一個執行緒消費這些物件的場景。下圖是對這個原理的闡述:

blocking-queue
一個執行緒往裡邊放,另外一個執行緒從裡邊取的一個 BlockingQueue。
一個執行緒將會持續生產新物件並將其插入到佇列之中,直到佇列達到它所能容納的臨界點。也就是說,它是有限的。如果該阻塞佇列到達了其臨界點,負責生產的執行緒將會在往裡邊插入新物件時發生阻塞。它會一直處於阻塞之中,直到負責消費的執行緒從佇列中拿走一個物件。
負責消費的執行緒將會一直從該阻塞佇列中拿出物件。如果消費執行緒嘗試去從一個空的佇列中提取物件的話,這個消費執行緒將會處於阻塞之中,直到一個生產執行緒把一個物件丟進佇列。

BlockingQueue 的方法

BlockingQueue 具有 4 組不同的方法用於插入、移除以及對佇列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:
拋異常 特定值 阻塞 超時
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
檢查 element(o) peek(o)

四組不同的行為方式解釋:
  1. 拋異常:如果試圖的操作無法立即執行,拋一個異常。
  2. 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
  3. 阻塞:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。
  4. 超時:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
無法向一個 BlockingQueue 中插入 null。如果你試圖插入 null,BlockingQueue 將會丟擲一個 NullPointerException。
可以訪問到 BlockingQueue 中的所有元素,而不僅僅是開始和結束的元素。比如說,你將一個物件放入佇列之中以等待處理,但你的應用想要將其取消掉。那麼你可以呼叫諸如 remove(o) 方法來將佇列之中的特定物件進行移除。但是這麼幹效率並不高(譯者注:基於佇列的資料結構,獲取除開始或結束位置的其他物件的效率不會太高),因此你儘量不要用這一類的方法,除非你確實不得不那麼做。

BlockingQueue 的實現

BlockingQueue 是個介面,你需要使用它的實現之一來使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 介面的實現(Java 6):
其他:
  • BlockingDeque.class,雙端阻塞佇列介面

阻塞雙端佇列 BlockingDeque

java.util.concurrent 包裡的 BlockingDeque 介面表示一個執行緒安放入和提取例項的雙端佇列。本小節我將給你演示如何使用 BlockingDeque。
BlockingDeque 類是一個雙端佇列,在不能夠插入元素時,它將阻塞住試圖插入元素的執行緒;在不能夠抽取元素時,它將阻塞住試圖抽取的執行緒。
deque(雙端佇列) 是 "Double Ended Queue" 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。

BlockingDeque 的使用

線上程既是一個佇列的生產者又是這個佇列的消費者的時候可以使用到 BlockingDeque。如果生產者執行緒需要在佇列的兩端都可以插入資料,消費者執行緒需要在佇列的兩端都可以移除資料,這個時候也可以使用 BlockingDeque。BlockingDeque 圖解:

blocking-deque
一個 BlockingDeque - 執行緒在雙端佇列的兩端都可以插入和提取元素。
一個執行緒生產元素,並把它們插入到佇列的任意一端。如果雙端佇列已滿,插入執行緒將被阻塞,直到一個移除執行緒從該佇列中移出了一個元素。如果雙端佇列為空,移除執行緒將被阻塞,直到一個插入執行緒向該佇列插入了一個新元素。

BlockingDeque 的方法

BlockingDeque 具有 4 組不同的方法用於插入、移除以及對雙端佇列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:
拋異常 特定值 阻塞 超時
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
檢查 getFirst(o) peekFirst(o)


拋異常 特定值 阻塞 超時
插入 addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
移除 removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
檢查 getLast(o) peekLast(o)

四組不同的行為方式解釋:
  1. 拋異常:如果試圖的操作無法立即執行,拋一個異常。
  2. 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
  3. 阻塞:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。
  4. 超時:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

BlockingDeque 繼承自 BlockingQueue

BlockingDeque 介面繼承自 BlockingQueue 介面。這就意味著你可以像使用一個 BlockingQueue 那樣使用 BlockingDeque。如果你這麼幹的話,各種插入方法將會把新元素新增到雙端佇列的尾端,而移除方法將會把雙端佇列的首端的元素移除。正如 BlockingQueue 介面的插入和移除方法一樣。
以下是 BlockingDeque 對 BlockingQueue 介面的方法的具體內部實現:
BlockingQueue BlockingDeque
add() addLast()
offer() x 2 offerLast() x 2
put() putLast()
remove() removeFirst()
poll() x 2 pollFirst()
take() takeFirst()
element() getFirst()
peek() peekFirst()

BlockingDeque 的實現

既然 BlockingDeque 是一個介面,那麼你想要使用它的話就得使用它的眾多的實現類的其中一個。java.util.concurrent 包提供了以下 BlockingDeque 介面的實現類:

  • ArrayBlockingQueue.class,阻塞佇列,陣列實現

陣列阻塞佇列 ArrayBlockingQueue

ArrayBlockingQueue 類實現了 BlockingQueue 介面。
ArrayBlockingQueue 是一個有界的阻塞佇列,其內部實現是將物件放到一個數組裡。有界也就意味著,它不能夠儲存無限多數量的元素。它有一個同一時間能夠儲存元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(譯者注:因為它是基於陣列實現的,也就具有陣列的特性:一旦初始化,大小就無法修改)。
ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
  • LinkedBlockingDeque.class,阻塞雙端佇列,連結串列實現

鏈阻塞雙端佇列 LinkedBlockingDeque

LinkedBlockingDeque 類實現了 BlockingDeque 介面。
deque(雙端佇列) 是 "Double Ended Queue" 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。(譯者注:唐僧啊,受不了。)
LinkedBlockingDeque 是一個雙端佇列,在它為空的時候,一個試圖從中抽取資料的執行緒將會阻塞,無論該執行緒是試圖從哪一端抽取資料。
  • LinkedBlockingQueue.class,阻塞佇列,連結串列實現

鏈阻塞佇列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 介面。
LinkedBlockingQueue 內部以一個鏈式結構(連結節點)對其元素進行儲存。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。
LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
  • DelayQueue.class,阻塞佇列,並且元素是Delay的子類,保證元素在達到一定時間後才可以取得到

DelayQueue

DelayQueue 實現了 BlockingQueue 介面。
DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 介面,
  • PriorityBlockingQueue.class,優先順序阻塞佇列

具有優先順序的阻塞佇列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 介面。
PriorityBlockingQueue 是一個無界的併發佇列。它使用了和類 java.util.PriorityQueue 一樣的排序規則。你無法向這個佇列中插入 null 值。
所有插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 介面。因此該佇列中元素的排序就取決於你自己的 Comparable 實現。
注意 PriorityBlockingQueue 對於具有相等優先順序(compare() == 0)的元素並不強制任何特定行為。
同時注意,如果你從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先順序為序的。
  • SynchronousQueue.class,同步佇列  但是佇列長度為0,生產者放入佇列的操作會被阻塞,直到消費者過來取,所以這個佇列根本不需要空間存放元素;有點像一個獨木橋,一次只能一人通過,還不能在橋上停留   

SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 介面。
SynchronousQueue 是一個特殊的佇列,它的內部同時只能夠容納單個元素。如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。同樣,如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。

據此,把這個類稱作一個佇列顯然是誇大其詞了。它更多像是一個匯合點。


1.2、非阻塞佇列

  • ConcurrentLinkedDeque.class,非阻塞雙端佇列,連結串列實現
  • ConcurrentLinkedQueue.class,非阻塞佇列,連結串列實現

1.3、轉移佇列

  • TransferQueue.class,轉移佇列介面,生產者要等消費者消費的佇列,生產者嘗試把元素直接轉移給消費者
  • LinkedTransferQueue.class,轉移佇列的連結串列實現,它比SynchronousQueue更快

1.4、其它容器

  • ConcurrentMap.class,併發Map的介面,定義了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)這四個併發場景下特定的方法
  • ConcurrentHashMap.class,併發HashMap
  • ConcurrentNavigableMap.class,NavigableMap的實現類,返回最接近的一個元素
  • ConcurrentSkipListMap.class,它也是NavigableMap的實現類(要求元素之間可以比較),同時它比ConcurrentHashMap更加scalable——ConcurrentHashMap並不保證它的操作時間,並且你可以自己來調整它的load factor;但是ConcurrentSkipListMap可以保證O(log n)的效能,同時不能自己來調整它的併發引數,只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,才去使用它
  • ConcurrentSkipListSet.class,和上面類似,只不過map變成了set
  • CopyOnWriteArrayList.class,copy-on-write模式的array list,每當需要插入元素,不在原list上操作,而是會新建立一個list,適合讀遠遠大於寫並且寫時間並苛刻的場景
  • CopyOnWriteArraySet.class,和上面類似,list變成set而已

併發 Map(對映) ConcurrentMap

java.util.concurrent.ConcurrentMap

java.util.concurrent.ConcurrentMap 介面表示了一個能夠對別人的訪問(插入和提取)進行併發處理的 java.util.Map。
ConcurrentMap 除了從其父介面 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。

ConcurrentMap 的實現

既然 ConcurrentMap 是個介面,你想要使用它的話就得使用它的實現類之一。java.util.concurrent 包具備 ConcurrentMap 介面的以下實現類:
  • ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 java.util.HashTable 類很相似,但 ConcurrentHashMap 能夠提供比 HashTable 更好的併發效能。在你從中讀取物件的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。此外,在你向其中寫入物件的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map 中正在被寫入的部分進行鎖定。
另外一個不同點是,在被遍歷的時候,即使是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。儘管 Iterator 的設計不是為多個執行緒的同時使用。
更多關於 ConcurrentMap 和 ConcurrentHashMap 的細節請參考官方文件。

併發導航對映 ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 是一個支援併發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備併發訪問的能力。所謂的 "子 map" 指的是諸如 headMap(),subMap(),tailMap() 之類的方法返回的 map。

NavigableMap 中的方法不再贅述,本小節我們來看一下 ConcurrentNavigableMap 新增的方法。

2、同步裝置

這些類大部分都是幫助做執行緒之間同步的,簡單描述,就像是提供了一個籬笆,執行緒執行到這個籬笆的時候都得等一等,等到條件滿足以後再往後走。

  • CountDownLatch.class,一個執行緒呼叫await方法以後,會阻塞地等待計數器被呼叫countDown直到變成0,功能上和下面的CyclicBarrier有點像
  • CyclicBarrier.class,也是計數等待,只不過它是利用await方法本身來實現計數器“+1”的操作,一旦計數器上顯示的數字達到Barrier可以打破的界限,就會丟擲BrokenBarrierException,執行緒就可以繼續往下執行;請參見我寫過的這篇文章《同步、非同步轉化和任務執行》中的Barrier模式
  • Semaphore.class,功能上很簡單,acquire()和release()兩個方法,一個嘗試獲取許可,一個釋放許可,Semaphore構造方法提供了傳入一個表示該訊號量所具備的許可數量。
  • Exchanger.class,這個類的例項就像是兩列飛馳的火車(執行緒)之間開了一個神奇的小視窗,通過小視窗(exchange方法)可以讓兩列火車安全地交換資料。
  • Phaser.class,功能上和第1、2個差不多,但是可以重用,且更加靈活,稍微有點複雜(CountDownLatch是不斷-1,CyclicBarrier是不斷+1,而Phaser定義了兩個概念,phase和party),我在下面畫了張圖,希望能夠幫助理解:
    • 一個是phase,表示當前在哪一個階段,每碰到一次barrier就會觸發advance操作(觸發前呼叫onAdvance方法),一旦越過這道barrier就會觸發phase+1,這很容易理解;
    • 另一個是party,很多文章說它就是執行緒數,但是其實這並不準確,它更像一個用於判斷advance是否被允許發生的計數器:
      • 任何時候都有一個party的總數,即註冊(registered)的party數,它可以在Phaser構造器裡指定,也可以任意時刻呼叫方法動態增減;
      • 每一個party都有unarrived和arrived兩種狀態,可以通過呼叫arriveXXX方法使得它從unarrived變成arrived;
      • 每一個執行緒到達barrier後會等待(呼叫arriveAndAwaitAdvance方法),一旦所有party都到達(即arrived的party數量等於registered的數量),就會觸發advance操作,同時barrier被打破,執行緒繼續向下執行,party重新變為unarrived狀態,重新等待所有party的到達;
      • 在絕大多數情況下一個執行緒就只負責操控一個party的到達,因此很多文章說party指的就是執行緒,但是這是不準確的,因為一個執行緒完全可以操控多個party,只要它執行多次的arrive方法。

2.1、閉鎖 CountDownLatch

java.util.concurrent.CountDownLatch 是一個併發構造,它允許一個或多個執行緒等待一系列指定操作的完成。
CountDownLatch 以一個給定的數量初始化。countDown() 每被呼叫一次,這一數量就減一。通過呼叫 await() 方法之一,執行緒可以阻塞等待這一數量到達零。

2.2、柵欄 CyclicBarrier
java.util.concurrent.CyclicBarrier 類是一種同步機制,它能夠對處理一些演算法的執行緒實現同步。換句話講,它就是一個所有執行緒必須等待的一個柵欄,直到所有執行緒都到達這裡,然後所有執行緒才可以繼續做其他事情。

2.3、交換機 Exchanger

java.util.concurrent.Exchanger 類表示一種兩個執行緒可以進行互相交換物件的會和點。這種機制圖示如下:

exchanger
兩個執行緒通過一個 Exchanger 交換物件。
交換物件的動作由 Exchanger 的兩個 exchange() 方法的其中一個完成。
2.4、訊號量 Semaphore
java.util.concurrent.Semaphore 類是一個計數訊號量。這就意味著它具備兩個主要方法:
  • acquire()
  • release()
計數訊號量由一個指定數量的 "許可" 初始化。每呼叫一次 acquire(),一個許可會被呼叫執行緒取走。每呼叫一次 release(),一個許可會被返還給訊號量。因此,在沒有任何 release() 呼叫時,最多有 N 個執行緒能夠通過 acquire() 方法,N 是該訊號量初始化時的許可的指定數量。這些許可只是一個簡單的計數器。這裡沒啥奇特的地方。

Semaphore 用法

訊號量主要有兩種用途:
  1. 保護一個重要(程式碼)部分防止一次超過 N 個執行緒進入。
  2. 在兩個執行緒之間傳送訊號。

保護重要部分

如果你將訊號量用於保護一個重要部分,試圖進入這一部分的程式碼通常會首先嚐試獲得一個許可,然後才能進入重要部分(程式碼塊),執行完之後,再把許可釋放掉。

線上程之間傳送訊號

如果你將一個訊號量用於在兩個執行緒之間傳送訊號,通常你應該用一個執行緒呼叫 acquire() 方法,而另一個執行緒呼叫 release() 方法。
如果沒有可用的許可,acquire() 呼叫將會阻塞,直到一個許可被另一個執行緒釋放出來。同理,如果無法往訊號量釋放更多許可時,一個 release() 呼叫也會阻塞。
通過這個可以對多個執行緒進行協調。比如,如果執行緒 1 將一個物件插入到了一個共享列表(list)之後之後呼叫了 acquire(),而執行緒 2 則在從該列表中獲取一個物件之前呼叫了 release(),這時你其實已經建立了一個阻塞佇列。訊號量中可用的許可的數量也就等同於該阻塞佇列能夠持有的元素個數。

公平

沒有辦法保證執行緒能夠公平地可從訊號量中獲得許可。也就是說,無法擔保掉第一個呼叫 acquire() 的執行緒會是第一個獲得一個許可的執行緒。如果第一個執行緒在等待一個許可時發生阻塞,而第二個執行緒前來索要一個許可的時候剛好有一個許可被釋放出來,那麼它就可能會在第一個執行緒之前獲得許可。
如果你想要強制公平,Semaphore 類有一個具有一個布林型別的引數的構造子,通過這個引數以告知 Semaphore 是否要強制公平。強制公平會影響到併發效能,所以除非你確實需要它否則不要啟用它。

更多方法

java.util.concurrent.Semaphore 類還有很多方法,比如:
  • availablePermits()
  • acquireUninterruptibly()
  • drainPermits()
  • hasQueuedThreads()
  • getQueuedThreads()
  • tryAcquire()
  • 等等

這些方法的細節請參考 Java 文件。

3、執行器和執行緒池

3.1、ExecutorService

  • Future.class,非同步計算的結果物件,get方法會阻塞執行緒直至真正的結果返回
  • Callable.class,用於非同步執行的可執行物件,call方法有返回值,它和Runnable介面很像,都提供了在其他執行緒中執行的方法,二者的區別在於:
    • Runnable沒有返回值,Callable有
    • Callable的call方法聲明瞭異常丟擲,而Runnable沒有
  • RunnableFuture.class,實現自Runnable和Future的子介面,成功執行run方法可以完成它自身這個Future並允許訪問其結果,它把任務執行和結果物件放到一起了
  • FutureTask.class,RunnableFuture的實現類,可取消的非同步計算任務,僅在計算完成時才能獲取結果,一旦計算完成,就不能再重新開始或取消計算;它的取消任務方法cancel(boolean mayInterruptIfRunning)接收一個boolean引數表示在取消的過程中是否需要設定中斷
  • Executor.class,執行提交任務的物件,只有一個execute方法
  • Executors.class,輔助類和工廠類,幫助生成下面這些ExecutorService
  • ExecutorService.class,Executor的子介面,管理執行非同步任務的執行器,AbstractExecutorService提供了預設實現
  • AbstractExecutorService.class,ExecutorService的實現類,提供執行方法的預設實現,包括:
    • ① submit的幾個過載方法,返回Future物件,接收Runnable或者Callable引數
    • ② invokeXXX方法,這類方法返回的時候,任務都已結束,即要麼全部的入參task都執行完了,要麼cancel了
  • ThreadPoolExecutor.class,執行緒池,AbstractExecutorService的子類,除了從AbstractExecutorService繼承下來的①、②兩類提交任務執行的方法以外,還有:
    • ③ 實現自Executor介面的execute方法,接收一個Runnable引數,沒有返回值
  • RejectedExecutionHandler.class,當任務無法被執行的時候,定義處理邏輯的地方,前面已經提到過了
  • ThreadFactory.class,執行緒工廠,用於建立執行緒
3.2、ScheduledExecutor
  • Delayed.class,延遲執行的介面,只有long getDelay(TimeUnit unit)這樣一個介面方法
  • ScheduledFuture.class,Delayed和Future的共同子介面
  • RunnableScheduledFuture.class,ScheduledFuture和RunnableFuture的共同子介面,增加了一個方法boolean isPeriodic(),返回它是否是一個週期性任務,一個週期性任務的特點在於它可以反覆執行
  • ScheduledExecutorService.class,ExecutorService的子介面,它允許任務延遲執行,相應地,它返回ScheduledFuture
  • ScheduledThreadPoolExecutor.class,可以延遲執行任務的執行緒池

3.3、CompletionService

  • CompletionService.class,它是對ExecutorService的改進,因為ExecutorService只是負責處理任務並把每個任務的結果物件(Future)給你,卻並沒有說要幫你“管理”這些結果物件,這就意味著你得自己建立一個物件容器存放這些結果物件,很麻煩;CompletionService像是集成了一個Queue的功能,你可以呼叫Queue一樣的方法——poll來獲取結果物件,還有一個方法是take,它和poll差不多,區別在於take方法在沒有結果物件的時候會返回空,而poll方法會block住執行緒直到有結果物件返回
  • ExecutorCompletionService.class,是CompletionService的實現類
4、Fork-join框架

這是一個JDK7引入的並行框架,它把流程劃分成fork(分解)+join(合併)兩個步驟(怎麼那麼像MapReduce?),傳統執行緒池來實現一個並行任務的時候,經常需要花費大量的時間去等待其他執行緒執行任務的完成,但是fork-join框架使用work stealing技術緩解了這個問題:

  1. 每個工作執行緒都有一個雙端佇列,當分給每個任務一個執行緒去執行的時候,這個任務會放到這個佇列的頭部;
  2. 當這個任務執行完畢,需要和另外一個任務的結果執行合併操作,可是那個任務卻沒有執行的時候,不會幹等,而是把另一個任務放到佇列的頭部去,讓它儘快執行;
  3. 當工作執行緒的佇列為空,它會嘗試從其他執行緒的佇列尾部偷一個任務過來;
  4. 取得的任務可以被進一步分解。
  • ForkJoinPool.class,ForkJoin框架的任務池,ExecutorService的實現類
  • ForkJoinTask.class,Future的子類,框架任務的抽象
  • ForkJoinWorkerThread.class,工作執行緒
  • RecursiveTask.class,ForkJoinTask的實現類,compute方法有返回值,下文中有例子
  • RecursiveAction.class,ForkJoinTask的實現類,compute方法無返回值,只需要覆寫compute方法,對於可繼續分解的子任務,呼叫coInvoke方法完成(引數是RecursiveAction子類物件的可變陣列)
4.1概述、

使用 ForkJoinPool 進行分叉和合並

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務可以繼續分割成更小的子任務,只要它還能分割。可能聽起來有些抽象,因此本節中我們將會解釋 ForkJoinPool 是如何工作的,還有任務分割是如何進行的。

分叉和合並解釋

在我們開始看 ForkJoinPool 之前我們先來簡要解釋一下分叉和合並的原理。
分叉和合並原理包含兩個遞迴進行的步驟。兩個步驟分別是分叉步驟和合並步驟。

分叉

一個使用了分叉和合並原理的任務可以將自己分叉(分割)為更小的子任務,這些子任務可以被併發執行。如下圖所示:
java-fork-and-join-1
通過把自己分割成多個子任務,每個子任務可以由不同的 CPU 並行執行,或者被同一個 CPU 上的不同執行緒執行。
只有當給的任務過大,把它分割成幾個子任務才有意義。把任務分割成子任務有一定開銷,因此對於小型任務,這個分割的消耗可能比每個子任務併發執行的消耗還要大。
什麼時候把一個任務分割成子任務是有意義的,這個界限也稱作一個閥值。這要看每個任務對有意義閥值的決定。很大程度上取決於它要做的工作的種類。

合併

當一個任務將自己分割成若干子任務之後,該任務將進入等待所有子任務的結束之中。
一旦子任務執行結束,該任務可以把所有結果合併到同一個結果。圖示如下:
java-fork-and-join-2
當然,並非所有型別的任務都會返回一個結果。如果這個任務並不返回一個結果,它只需等待所有子任務執行完畢。也就不需要結果的合併啦。
4.2、ForkJoinPool
ForkJoinPool 是一個特殊的執行緒池,它的設計是為了更好的配合 分叉-和-合併 任務分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名為 java.util.concurrent.ForkJoinPool。

建立一個 ForkJoinPool

你可以通過其構造子建立一個 ForkJoinPool。作為傳遞給 ForkJoinPool 構造子的一個引數,你可以定義你期望的並行級別。並行級別表示你想要傳遞給 ForkJoinPool 的任務所需的執行緒或 CPU 數量。以下是一個 ForkJoinPool 示例:
[java] view plain copy  print?
  1. ForkJoinPool forkJoinPool = new ForkJoinPool(4);  

這個示例建立了一個並行級別為 4 的 ForkJoinPool。

提交任務到 ForkJoinPool

就像提交任務到 ExecutorService 那樣,把任務提交到 ForkJoinPool。你可以提交兩種型別的任務。一種是沒有任何返回值的(一個 "行動"),另一種是有返回值的(一個"任務")。這兩種型別分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種型別的任務,以及如何對它們進行提交。

4.3、RecursiveAction

RecursiveAction 是一種沒有任何返回值的任務。它只是做一些工作,比如寫資料到磁碟,然後就退出了。
一個 RecursiveAction 可以把自己的工作分割成更小的幾塊,這樣它們可以由獨立的執行緒或者 CPU 執行。
你可以通過繼承來實現一個 RecursiveAction。

相關推薦

Java 併發工具-java.util.concurrent-原始碼jdk1.7全面解析

先來看看類圖: 其實從類圖我們能發現concurrent包(除去java.util.concurrent.atomic 和 java.util.concurrent.locks)中的內容並沒有特別多,大概分為四類:BlockingQueue阻塞佇列體系、Executor

Java 併發工具 java.util.concurrent 使用者指南

譯序 1. java.util.concurrent - Java 併發工具包 Java 5 添加了一個新的包到 Java 平臺,java.util.concurrent 包。這個包包含有一系列能夠讓 Java 的併發程式設計變得更加簡單輕鬆的類。在這個包被新

Java併發工具java.util.concurrent使用者指南

拋異常 特定值 阻塞 超時 插入 add(o) offer(o) put(o) offer(o, timeout, timeunit) 移除 remove(o) poll(o) take(o) poll(timeout, timeuni

Java併發程式設計-併發工具java.util.concurrent使用指南

譯序 1. java.util.concurrent - Java併發工具包 Java 5 添加了一個新的包到 Java 平臺,java.util.concurrent 包。這個包包含有一系列能夠讓 Java 的併發程式設計變得更加簡單輕鬆的類。在這

Java多執行緒(二)Java併發工具concurrent例項簡述

傳統的多執行緒並沒有提供高階特性,例如:訊號量、執行緒池和執行管理器等,而這些特性恰恰有助於建立強大的併發程式。新的Fork/Join框架針對當前的多核系統,也提供了並行程式設計的可能。這塊的內容是java多執行緒資訊量最大的一部分內容,本篇部落格循序漸進的,首

理解Java併發工具執行緒池的設計

為什麼需要執行緒池? 答:主要原因是因為建立一個執行緒開銷太大,尤其是對大量的小任務需要執行這種場景。 在Java裡面建立一個執行緒,需要包含的東西: (1)它為一個執行緒堆疊分配記憶體,該堆疊為每個執行緒方法呼叫儲存一個幀 (2)每個幀由區域性變數陣列,返回值,運算

Java併發工具使用指南(全)

 1. java.util.concurrent - Java 併發工具包 Java 5 添加了一個新的包到 Java 平臺,java.util.concurrent 包。這個包包含有一系列能夠讓 Java 的併發程式設計變得更加簡單輕鬆的類。在這個包被新增以前,

java.util.concurrent 併發工具(二)

一、ThreadPoolExecutor 執行緒池執行者 初始化執行緒池的引數 corePoolSize 核心執行緒大小 maximumPoolSize 最大執行緒大小 keepAliveTime 空餘的多餘執行緒保持時間 unit 時間

圖解java.util.concurrent原始碼(三) Reentrantlock && Semaphore

引言 Reentrantlock和Semaphore分別是AQS在獨佔模式和共享模式下經典的實現,在理解AQS的情況下看這兩個類的程式碼會感到非常簡單,如果還沒理解AQS的話,建議先讀我這個系列的第一篇文章 複習AQS 回憶一下AQS,AQS中維護了一個st

Java例項學習 Java併發程式設計之java.util.concurrent.CopyOnWriteArrayList

CopyOnWriteArrayList   CopyOnWriteArrayList是ArrayList在併發環境下的替代品。CopyOnWriteArrayList通過增加寫時複製語義來避免併發訪問引起的問題,也就是說任何修改操作都會在底層建立一個列表的副本,也就意

Java例項學習 Java併發程式設計之java.util.concurrent.CountDownLatch

import java.util.concurrent.CountDownLatch; /**  * 工人類  */ class Worker {     private String name;        // 名字     private long workDuration;  // 工作持續時間  

java 反射工具reflections

分享圖片 class ref logs annotate .get pan can sys reflections 中包含很多的Scanner ,也就是掃描器,調用對應的方法時需要有配置對應的掃描器,不然程序會拋出異常. 掃描器結構: 使用時,我們主要使用Reflecti

Apache commons(Java常用工具)簡介

機制 encode 解析 help IT PE tom base cit Apache Commons是一個非常有用的工具包,解決各種實際的通用問題,下面是一個簡述表,詳細信息訪問http://jakarta.apache.org/commons/index.html Be

Java併發工具類詳解

在JDK的併發包裡提供了幾個非常有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了線上程間交換資料的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。 等待多執行緒完成的Cou

java併發之原子操作類(AtomicLong原始碼分析)和非阻塞演算法

  背景 近年來,在併發演算法領域的大多數研究都側重於非阻塞演算法,這種演算法用底層的原子機器指令(例如比較併發交換指令)代替鎖來確保資料在併發訪問中的一致性。非阻塞演算法被廣泛的用於在作業系統和JVM中實現執行緒/程序排程機制、垃圾回收機制以及鎖和其他併發資料結構。 與基於鎖

什麼是JDK?關於JDK(Java Development Kit)Java開發工具的介紹

什麼是JDK?關於JDK(Java Development Kit)Java開發工具包的介紹 JDK是構建Java應用程式的關鍵平臺部分,JDK的核心是Java編譯器。 JDK是Java程式設計三個核心技術包之一,另外兩個是JVM(Java Virtual Machine)Java虛擬機器和

Java併發工具類——AtomicInteger

基本型別int的遞增等操作並不是執行緒安全的,加上synchronized又會影響效能,因此在併發情況下我們應該使用AtomicInteger,下面通過一個例子驗證一哈。 public class TestAtomicInteger { public static void mai

淺析Java併發工具類Semaphore

淺析Java併發工具類Semaphore 1. 概述 2. 原始碼分析 3. 一個例子 4. 總結 1. 概述        Semaphore類表示訊號量。Semaphore內部主要通

Java併發程式設計中atomic的實現原理

這是一篇來自粉絲的投稿,作者【林灣村龍貓】最近在閱讀Java原始碼,這一篇是他關於併發包中atomic類的原始碼閱讀的總結。Hollis做了一點點修改。 引子 在多執行緒的場景中,我們需要保證資料安全,就會考慮同步的方案,通常會使用synchronized或者lo

Java 併發工具類使用

java.util.concurrent 包從 JDK1.5 開始引入,目的是解決併發程式設計的執行緒安全問題,提供非常有用的併發工具類,包括 CountDownLatch、CyclicBarrier 與 Semaphore 等。 在 concurrent 包下