Java集合之三—ConcurrentHashMap
我們上述所講的Map都是非執行緒安全的,這意味著不應該在多個執行緒中對這些Map進行修改操作,輕則會產生資料不一致的問題,甚至還會因為併發插入元素而導致連結串列成環(插入會觸發擴容,而擴容操作需要將原陣列中的元素rehash到新陣列,這時併發操作就有可能產生連結串列的迴圈引用從而成環),這樣在查詢時就會發生死迴圈,影響到整個應用程式。
Collections.synchronizedMap(Map<K,V> m)
可以將一個Map轉換成執行緒安全的實現,其實也就是通過一個包裝類,然後把所有功能都委託給傳入的Map實現,而且包裝類是基於synchronized
關鍵字來保證執行緒安全的(時代的眼淚Hashtable也是基於synchronized
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) { return new SynchronizedMap<>(m); } private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } ............ }
然而ConcurrentHashMap的實現細節遠沒有這麼簡單,因此效能也要高上許多。它沒有使用一個全域性鎖來鎖住自己,而是採用了減少鎖粒度的方法,儘量減少因為競爭鎖而導致的阻塞與衝突,而且ConcurrentHashMap的檢索操作是不需要鎖的。
在Java 7中,ConcurrentHashMap把內部細分成了若干個小的HashMap,稱之為段(Segment),預設被分為16個段。對於一個寫操作而言,會先根據hash code進行定址,得出該Entry應被存放在哪一個Segment,然後只要對該Segment加鎖即可。
理想情況下,一個預設的ConcurrentHashMap可以同時接受16個執行緒進行寫操作(如果都是對不同Segment進行操作的話)。
分段鎖對於size()
這樣的全域性操作來說就沒有任何作用了,想要得出Entry的數量就需要遍歷所有Segment,獲得所有的鎖,然後再統計總數。事實上,ConcurrentHashMap會先試圖使用無鎖的方式統計總數,這個嘗試會進行3次,如果在相鄰的2次計算中獲得的Segment的modCount次數一致,代表這兩次計算過程中都沒有發生過修改操作,那麼就可以當做最終結果返回,否則,就要獲得所有Segment的鎖,重新計算size。
本文主要討論的是Java 8的ConcurrentHashMap,它與Java 7的實現差別較大。完全放棄了段的設計,而是變回與HashMap相似的設計,使用buckets陣列與分離連結法(同樣會在超過閾值時樹化,對於構造紅黑樹的邏輯與HashMap差別不大,只不過需要額外使用CAS來保證執行緒安全),鎖的粒度也被細分到每個陣列元素(個人認為這樣做的原因是因為HashMap在Java 8中也實現了不少優化,即使碰撞嚴重,也能保證一定的效能,而且Segment不僅臃腫還有弱一致性的問題存在),所以它的併發級別與陣列長度相關(Java 7則是與段數相關)。
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
定址
ConcurrentHashMap的雜湊函式與HashMap並沒有什麼區別,同樣是把key的hash code的高16位與低16位進行異或運算(因為ConcurrentHashMap的buckets陣列長度也永遠是一個2的N次方),然後將擾亂後的hash code與陣列的長度減一(實際可訪問到的最大索引)進行與運算,得出的結果即是目標所在的位置。
// 2^31 - 1,int型別的最大值
// 該掩碼錶示節點hash的可用位,用來保證hash永遠為一個正整數
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
下面是查詢操作的原始碼,實現比較簡單。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 先嚐試判斷連結串列頭是否為目標,如果是就直接返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// eh < 0代表這是一個特殊節點(TreeBin或ForwardingNode)
// 所以直接呼叫find()進行遍歷查詢
return (p = e.find(h, key)) != null ? p.val : null;
// 遍歷連結串列
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
一個普通的節點(連結串列節點)的hash不可能小於0(已經在spread()
函式中修正過了),所以小於0的只可能是一個特殊節點,它不能用while迴圈中遍歷連結串列的方式來進行遍歷。
TreeBin是紅黑樹的頭部節點(紅黑樹的節點為TreeNode),它本身不含有key與value,而是指向一個TreeNode節點的連結串列與它們的根節點,同時使用CAS(ConcurrentHashMap並不是完全基於互斥鎖實現的,而是與CAS這種樂觀策略搭配使用,以提高效能)實現了一個讀寫鎖,迫使Writer(持有這個鎖)在樹重構操作之前等待Reader完成。
ForwardingNode是一個在資料轉移過程(由擴容引起)中使用的臨時節點,它會被插入到頭部。它與TreeBin(和TreeNode)都是Node類的子類。
為了判斷出哪些是特殊節點,TreeBin和ForwardingNode的hash域都只是一個虛擬值:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
......
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final class TreeBin<K,V> extends Node<K,V> {
....
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
....
}
....
}
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
.....
}
可見性
我們在get()
函式中並沒有發現任何與鎖相關的程式碼,那麼它是怎麼保證執行緒安全的呢?一個操作ConcurrentHashMap.get("a")
,它的步驟基本分為以下幾步:
-
根據雜湊函式計算出的索引訪問table。
-
從table中取出頭節點。
-
遍歷頭節點直到找到目標節點。
-
從目標節點中取出value並返回。
所以只要保證訪問table與節點的操作總是能夠返回最新的資料就可以了。ConcurrentHashMap並沒有採用鎖的方式,而是通過volatile
關鍵字來保證它們的可見性。在上文貼出的程式碼中可以發現,table、Node.val和Node.next都是被volatile
關鍵字所修飾的。
volatile
關鍵字保證了多執行緒環境下變數的可見性與有序性,底層實現基於記憶體屏障(Memory Barrier)。
為了優化效能,現代CPU工作時的指令執行順序與應用程式的程式碼順序其實是不一致的(有些編譯器也會進行這種優化),也就是所謂的亂序執行技術。亂序執行可以提高CPU流水線的工作效率,只要保證資料符合程式邏輯上的正確性即可(遵循
happens-before
原則)。不過如今是多核時代,如果隨便亂序而不提供防護措施那是會出問題的。每一個cpu上都會進行亂序優化,單cpu所保證的邏輯次序可能會被其他cpu所破壞。記憶體屏障就是針對此情況的防護措施。可以認為它是一個同步點(但它本身也是一條cpu指令)。例如在
IA32
指令集架構中引入的SFENCE
指令,在該指令之前的所有寫操作必須全部完成,讀操作仍可以亂序執行。LFENCE
指令則保證之前的所有讀操作必須全部完成,另外還有粒度更粗的MFENCE
指令保證之前的所有讀寫操作都必須全部完成。記憶體屏障就像是一個保護指令順序的柵欄,保護後面的指令不被前面的指令跨越。將記憶體屏障插入到寫操作與讀操作之間,就可以保證之後的讀操作可以訪問到最新的資料,因為屏障前的寫操作已經把資料寫回到記憶體(根據快取一致性協議,不會直接寫回到記憶體,而是改變該cpu私有快取中的狀態,然後通知給其他cpu這個快取行已經被修改過了,之後另一個cpu在讀操作時就可以發現該快取行已經是無效的了,這時它會從其他cpu中讀取最新的快取行,然後之前的cpu才會更改狀態並寫回到記憶體)。
例如,讀一個被volatile
修飾的變數V總是能夠從JMM(Java Memory Model)主記憶體中獲得最新的資料。因為記憶體屏障的原因,每次在使用變數V(通過JVM指令use
,後面說的也都是JVM中的指令而不是cpu)之前都必須先執行load
指令(把從主記憶體中得到的資料放入到工作記憶體),根據JVM的規定,load
指令必須發生在read
指令(從主記憶體中讀取資料)之後,所以每次訪問變數V都會先從主記憶體中讀取。相對的,寫操作也因為記憶體屏障保證的指令順序,每次都會直接寫回到主記憶體。
不過volatile
關鍵字並不能保證操作的原子性,對該變數進行併發的連續操作是非執行緒安全的,所幸ConcurrentHashMap只是用來確保訪問到的變數是最新的,所以也不會發生什麼問題。
出於效能考慮,Doug Lea(java.util.concurrent
包的作者)直接通過Unsafe類來對table進行操作。
Java號稱是安全的程式語言,而保證安全的代價就是犧牲程式設計師自由操控記憶體的能力。像在C/C++中可以通過操作指標變數達到操作記憶體的目的(其實操作的是虛擬地址),但這種靈活性在新手手中也經常會帶來一些愚蠢的錯誤,比如記憶體訪問越界。
Unsafe從字面意思可以看出是不安全的,它包含了許多本地方法(在JVM平臺上執行的其他語言編寫的程式,主要為C/C++,由JNI
實現),這些方法支援了對指標的操作,所以它才被稱為是不安全的。雖然不安全,但畢竟是由C/C++實現的,像一些與作業系統互動的操作肯定是快過Java的,畢竟Java與作業系統之間還隔了一層抽象(JVM),不過代價就是失去了JVM所帶來的多平臺可移植性(本質上也只是一個c/cpp檔案,如果換了平臺那就要重新編譯)。
對table進行操作的函式有以下三個,都使用到了Unsafe(在java.util.concurrent
包隨處可見):
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
// 從tab陣列中獲取一個引用,遵循Volatile語義
// 引數2是一個在tab中的偏移量,用來尋找目標物件
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
// 通過CAS操作將tab陣列中位於引數2偏移量位置的值替換為v
// c是期望值,如果期望值與實際值不符,返回false
// 否則,v會成功地被設定到目標位置,返回true
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
// 設定tab陣列中位於引數2偏移量位置的值,遵循Volatile語義
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
初始化
ConcurrentHashMap與HashMap一樣是Lazy的,buckets陣列會在第一次訪問put()
函式時進行初始化,它的預設建構函式甚至是個空函式。
/**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {
}
但是有一點需要注意,ConcurrentHashMap是工作在多執行緒併發環境下的,如果有多個執行緒同時呼叫了put()
函式該怎麼辦?這會導致重複初始化,所以必須要有對應的防護措施。
ConcurrentHashMap聲明瞭一個用於控制table的初始化與擴容的例項變數sizeCtl,預設值為0。當它是一個負數的時候,代表table正處於初始化或者擴容的狀態。-1
表示table正在進行初始化,-N
則表示當前有N-1個執行緒正在進行擴容。
在其他情況下,如果table還未初始化(table == null
),sizeCtl表示table進行初始化的陣列大小(所以從建構函式傳入的initialCapacity在經過計算後會被賦給它)。如果table已經初始化過了,則表示下次觸發擴容操作的閾值,演算法stzeCtl = n - (n >>> 2)
,也就是n的75%,與預設負載因子(0.75)的HashMap一致。
private transient volatile int sizeCtl;
初始化table的操作位於函式initTable()
,原始碼如下:
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCtl小於0,這意味著已經有其他執行緒進行初始化了
// 所以當前執行緒讓出CPU時間片
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 否則,通過CAS操作嘗試修改sizeCtl
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 預設建構函式,sizeCtl = 0,使用預設容量(16)進行初始化
// 否則,會根據sizeCtl進行初始化
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 計算閾值,n的75%
sc = n - (n >>> 2);
}
} finally {
// 閾值賦給sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
sizeCtl是一個volatile
變數,只要有一個執行緒CAS操作成功,sizeCtl就會被暫時地修改為-1,這樣其他執行緒就能夠根據sizeCtl得知table是否已經處於初始化狀態中,最後sizeCtl會被設定成閾值,用於觸發擴容操作。
擴容
ConcurrentHashMap觸發擴容的時機與HashMap類似,要麼是在將連結串列轉換成紅黑樹時判斷table陣列的長度是否小於閾值(64),如果小於就進行擴容而不是樹化,要麼就是在新增元素的時候,判斷當前Entry數量是否超過閾值,如果超過就進行擴容。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 小於MIN_TREEIFY_CAPACITY,進行擴容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
// 將連結串列轉換成紅黑樹...
}
}
}
}
...
final V putVal(K key, V value, boolean onlyIfAbsent) {
...
addCount(1L, binCount); // 計數
return null;
}
private final void addCount(long x, int check) {
// 計數...
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// s(元素個數)大於等於sizeCtl,觸發擴容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 擴容標誌位
int rs = resizeStamp(n);
// sizeCtl為負數,代表正有其他執行緒進行擴容
if (sc < 0) {
// 擴容已經結束,中斷迴圈
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 進行擴容,並設定sizeCtl,表示擴容執行緒 + 1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 觸發擴容(第一個進行擴容的執行緒)
// 並設定sizeCtl告知其他執行緒
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
// 統計個數,用於迴圈檢測是否還需要擴容
s = sumCount();
}
}
}
可以看到有關sizeCtl的操作牽涉到了大量的位運算,我們先來理解這些位運算的意義。首先是resizeStamp()
,該函式返回一個用於資料校驗的標誌位,意思是對長度為n的table進行擴容。它將n的前導零(最高有效位之前的零的數量)和1 << 15
做或運算,這時低16位的最高位為1,其他都為n的前導零。
static final int resizeStamp(int n) {
// RESIZE_STAMP_BITS = 16
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
初始化sizeCtl(擴容操作被第一個執行緒首次進行)的演算法為(rs << RESIZE_STAMP_SHIFT) + 2
,首先RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS = 16
,那麼rs << 16
等於將這個標誌位移動到了高16位,這時最高位為1,所以sizeCtl此時是個負數,然後加二(至於為什麼是2,還記得有關sizeCtl的說明嗎?1代表初始化狀態,所以實際的執行緒個數是要減去1的)代表當前有一個執行緒正在進行擴容,
這樣sizeCtl就被分割成了兩部分,高16位是一個對n的資料校驗的標誌位,低16位表示參與擴容操作的執行緒個數 + 1。
可能會有讀者有所疑惑,更新進行擴容的執行緒數量的操作為什麼是sc + 1
而不是sc - 1
,這是因為對sizeCtl的操作都是基於位運算的,所以不會關心它本身的數值是多少,只關心它在二進位制上的數值,而sc + 1
會在低16位上加1。
tryPresize()
函式跟addCount()
的後半段邏輯類似,不斷地根據sizeCtl判斷當前的狀態,然後選擇對應的策略。
private final void tryPresize(int size) {
// 對size進行修正
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// sizeCtl是預設值或正整數
// 代表table還未初始化
// 或還沒有其他執行緒正在進行擴容
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
// 設定sizeCtl,告訴其他執行緒,table現在正處於初始化狀態
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
// 計算下次觸發擴容的閾值
sc = n - (n >>> 2);
}
} finally {
// 將閾值賦給sizeCtl
sizeCtl = sc;
}
}
}
// 沒有超過閾值或者大於容量的上限,中斷迴圈
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 進行擴容,與addCount()後半段的邏輯一致
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
擴容操作的核心在於資料的轉移,在單執行緒環境下資料的轉移很簡單,無非就是把舊陣列中的資料遷移到新的陣列。但是這在多執行緒環境下是行不通的,需要保證執行緒安全性,在擴容的時候其他執行緒也可能正在新增元素,這時又觸發了擴容怎麼辦?有人可能會說,這不難啊,用一個互斥鎖把資料轉移操作的過程鎖住不就好了?這確實是一種可行的解決方法,但同樣也會帶來極差的吞吐量。
互斥鎖會導致所有訪問臨界區的執行緒陷入阻塞狀態,這會消耗額外的系統資源,核心需要儲存這些執行緒的上下文並放到阻塞佇列,持有鎖的執行緒耗時越長,其他競爭執行緒就會一直被阻塞,因此吞吐量低下,導致響應時間緩慢。而且鎖總是會伴隨著死鎖問題,一旦發生死鎖,整個應用程式都會因此受到影響,所以加鎖永遠是最後的備選方案。
Doug Lea沒有選擇直接加鎖,而是基於CAS實現無鎖的併發同步策略,令人佩服的是他不僅沒有把其他執行緒拒之門外,甚至還邀請它們一起來協助工作。
那麼如何才能讓多個執行緒協同工作呢?Doug Lea把整個table陣列當做多個執行緒之間共享的任務佇列,然後只需維護一個指標,當有一個執行緒開始進行資料轉移,就會先移動指標,表示指標劃過的這片bucket區域由該執行緒負責。
這個指標被宣告為一個volatile
整型變數,它的初始位置位於table的尾部,即它等於table.length
,很明顯這個任務佇列是逆向遍歷的。
/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;
/**
* 一個執行緒需要負責的最小bucket數
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;
一個已經遷移完畢的bucket會被替換成ForwardingNode節點,用來標記此bucket已經被其他執行緒遷移完畢了。我們之前提到過ForwardingNode,它是一個特殊節點,可以通過hash域的虛擬值來識別它,它同樣重寫了find()
函式,用來在新陣列中查詢目標。
資料遷移的操作位於transfer()
函式,多個執行緒之間依靠sizeCtl與transferIndex指標來協同工作,每個執行緒都有自己負責的區域,一個完成遷移的bucket會被設定為ForwardingNode,其他執行緒遇見這個特殊節點就跳過該bucket,處理下一個bucket。
transfer()
函式可以大致分為三部分,第一部分對後續需要使用的變數進行初始化:
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 根據當前機器的CPU數量來決定每個執行緒負責的bucket數
// 避免因為擴容執行緒過多,反而影響到效能
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 初始化nextTab,容量為舊陣列的一倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; // 初始化指標
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
第二部分為當前執行緒分配任務和控制當前執行緒的任務進度,這部分是transfer()
的核心邏輯,描述瞭如何與其他執行緒協同工作:
// i指向當前bucket,bound表示當前執行緒所負責的bucket區域的邊界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 這個迴圈使用CAS不斷嘗試為當前執行緒分配任務
// 直到分配成功或任務佇列已經被全部分配完畢
// 如果當前執行緒已經被分配過bucket區域
// 那麼會通過--i指向下一個待處理bucket然後退出該迴圈
while (advance) {
int nextIndex, nextBound;
// --i表示將i指向下一個待處理的bucket
// 如果--i >= bound,代表當前執行緒已經分配過bucket區域
// 並且還留有未處理的bucket
if (--i >= bound || finishing)
advance = false;
// transferIndex指標 <= 0 表示所有bucket已經被分配完畢
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 移動transferIndex指標
// 為當前執行緒設定所負責的bucket區域的範圍
// i指向該範圍的第一個bucket,注意i是逆向遍歷的
// 這個範圍為(bound, i),i是該區域最後一個bucket,遍歷順序是逆向的
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 當前執行緒已經處理完了所負責的所有bucket
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果任務佇列已經全部完成
if (finishing) {
nextTable = null;
table = nextTab;
// 設定新的閾值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 工作中的擴容執行緒數量減1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// (resizeStamp << RESIZE_STAMP_SHIFT) + 2代表當前有一個擴容執行緒
// 相對的,(sc - 2) != resizeStamp << RESIZE_STAMP_SHIFT
// 表示當前還有其他執行緒正在進行擴容,所以直接返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 否則,當前執行緒就是最後一個進行擴容的執行緒
// 設定finishing標識
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果待處理bucket是空的
// 那麼插入ForwardingNode,以通知其他執行緒
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果待處理bucket的頭節點是ForwardingNode
// 說明此bucket已經被處理過了,跳過該bucket
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
最後一部分是具體的遷移過程(對當前指向的bucket),這部分的邏輯與HashMap類似,拿舊陣列的容量當做一個掩碼,然後與節點的hash進行與操作,可以得出該節點的新增有效位,如果新增有效位為0就放入一個連結串列A,如果為1就放入另一個連結串列B,連結串列A在新陣列中的位置不變(跟在舊陣列的索引一致),連結串列B在新陣列中的位置為原索引加上舊陣列容量。
這個方法減少了rehash的計算量,而且還能達到均勻分佈的目的,如果不能理解請去看本文中HashMap擴容操作的解釋。
else {
// 對於節點的操作還是要加上鎖的
// 不過這個鎖的粒度很小,只鎖住了bucket的頭節點
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// hash code不為負,代表這是條連結串列
if (fh >= 0) {
// fh & n 獲得hash code的新增有效位,用於將連結串列分離成兩類
// 要麼是0要麼是1,關於這個位運算的更多細節
// 請看本文中有關HashMap擴容操作的解釋
int runBit = fh & n;
Node<K,V> lastRun = f;
// 這個迴圈用於記錄最後一段連續的同一類節點
// 這個類別是通過fh & n來區分的
// 這段連續的同類節點直接被複用,不會產生額外的複製
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 0被放入ln連結串列,1被放入hn連結串列
// lastRun是連續同類節點的起始節點
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 將最後一段的連續同類節點之前的節點按類別複製到ln或hn
// 連結串列的插入方向是往頭部插入的,Node建構函式的第四個引數是next
// 所以就算遇到類別與lastRun一致的節點也只會被插入到頭部
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// ln連結串列被放入到原索引位置,hn放入到原索引 + 舊陣列容量
// 這一點與HashMap一致,如果看不懂請去參考本文對HashMap擴容的講解
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); // 標記該bucket已被處理
advance = true;
}
// 對紅黑樹的操作,邏輯與連結串列一樣,按新增有效位進行分類
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 元素數量沒有超過UNTREEIFY_THRESHOLD,退化成連結串列
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
計數
在Java 7中ConcurrentHashMap對每個Segment單獨計數,想要得到總數就需要獲得所有Segment的鎖,然後進行統計。由於Java 8拋棄了Segment,顯然是不能再這樣做了,而且這種方法雖然簡單準確但也捨棄了效能。
Java 8聲明瞭一個volatile
變數baseCount用於記錄元素的個數,對這個變數的修改操作是基於CAS的,每當插入元素或刪除元素時都會呼叫addCount()
函式進行計數。
private transient volatile long baseCount;
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 嘗試使用CAS更新baseCount失敗
// 轉用CounterCells進行更新
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 在CounterCells未初始化
// 或嘗試通過CAS更新當前執行緒的CounterCell失敗時
// 呼叫fullAddCount(),該函式負責初始化CounterCells和更新計數
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 統計總數
s = sumCount();
}
if (check >= 0) {
// 判斷是否需要擴容,在上文中已經講過了
}
}
counterCells是一個元素為CounterCell的陣列,該陣列的大小與當前機器的CPU數量有關,並且它不會被主動初始化,只有在呼叫fullAddCount()
函式時才會進行初始化。
CounterCell是一個簡單的內部靜態類,每個CounterCell都是一個用於記錄數量的單元:
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
註解@sun.misc.Contended
用於解決偽共享問題。所謂偽共享,即是在同一快取行(CPU快取的基本單位)中儲存了多個變數,當其中一個變數被修改時,就會影響到同一快取行內的其他變數,導致它們也要跟著被標記為失效,其他變數的快取命中率將會受到影響。解決偽共享問題的方法一般是對該變數填充一些無意義的佔位資料,從而使它獨享一個快取行。
ConcurrentHashMap的計數設計與LongAdder類似。在一個低併發的情況下,就只是簡單地使用CAS操作來對baseCount進行更新,但只要這個CAS操作失敗一次,就代表有多個執行緒正在競爭,那麼就轉而使用CounterCell陣列進行計數,陣列內的每個ConuterCell都是一個獨立的計數單元。
每個執行緒都會通過ThreadLocalRandom.getProbe() & m
定址找到屬於它的CounterCell,然後進行計數。ThreadLocalRandom是一個執行緒私有的偽隨機數生成器,每個執行緒的probe都是不同的(這點基於ThreadLocalRandom的內部實現,它在內部維護了一個probeGenerator,這是一個型別為AtomicInteger的靜態常量,每當初始化一個ThreadLocalRandom時probeGenerator都會先自增一個常量然後返回的整數即為當前執行緒的probe,probe變數被維護在Thread物件中),可以認為每個執行緒的probe就是它在CounterCell陣列中的hash code。
這種方法將競爭資料按照執行緒的粒度進行分離,相比所有競爭執行緒對一個共享變數使用CAS不斷嘗試在效能上要效率多了,這也是為什麼在高併發環境下LongAdder要優於AtomicInteger的原因。
fullAddCount()
函式根據當前執行緒的probe尋找對應的CounterCell進行計數,如果CounterCell陣列未被初始化,則初始化CounterCell陣列和CounterCell。該函式的實現與Striped64類(LongAdder的父類)的longAccumulate()
函式是一樣的,把CounterCell陣列當成一個散列表,每個執行緒的probe就是hash code,雜湊函式也僅僅是簡單的(n - 1) & probe
。
CounterCell陣列的大小永遠是一個2的n次方,初始容量為2,每次擴容的新容量都是之前容量乘以二,處於效能考慮,它的最大容量上限是機器的CPU數量。
所以說CounterCell陣列的碰撞衝突是很嚴重的,因為它的bucket基數太小了。而發生碰撞就代表著一個CounterCell會被多個執行緒競爭,為了解決這個問題,Doug Lea使用無限迴圈加上CAS來模擬出一個自旋鎖來保證執行緒安全,自旋鎖的實現基於一個被volatile
修飾的整數變數,該變數只會有兩種狀態:0和1,當它被設定為0時表示沒有加鎖,當它被設定為1時表示已被其他執行緒加鎖。這個自旋鎖用於保護初始化CounterCell、初始化CounterCell陣列以及對CounterCell陣列進行擴容時的安全。
CounterCell更新計數是依賴於CAS的,每次迴圈都會嘗試通過CAS進行更新,如果成功就退出無限迴圈,否則就呼叫ThreadLocalRandom.advanceProbe()
函式為當前執行緒更新probe,然後重新開始迴圈,以期望下一次定址到的CounterCell沒有被其他執行緒競爭。
如果連著兩次CAS更新都沒有成功,那麼會對CounterCell陣列進行一次擴容,這個擴容操作只會在當前迴圈中觸發一次,而且只能在容量小於上限時觸發。
fullAddCount()
函式的主要流程如下:
-
首先檢查當前執行緒有沒有初始化過ThreadLocalRandom,如果沒有則進行初始化。ThreadLocalRandom負責更新執行緒的probe,而probe又是在陣列中進行定址的關鍵。
-
檢查CounterCell陣列是否已經初始化,如果已初始化,那麼就根據probe找到對應的CounterCell。如果這個CounterCell等於null,需要先初始化CounterCell,通過把計數增量傳入建構函式,所以初始化只要成功就說明更新計數已經完成了。初始化的過程需要獲取自旋鎖。如果不為null,就按上文所說的邏輯對CounterCell實施更新計數。
-
CounterCell陣列未被初始化,嘗試獲取自旋鎖,進行初始化。陣列初始化的過程會附帶初始化一個CounterCell來記錄計數增量,所以只要初始化成功就表示更新計數完成。
-
如果自旋鎖被其他執行緒佔用,無法進行陣列的初始化,只好通過CAS更新baseCount。
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 當前執行緒的probe等於0,證明該執行緒的ThreadLocalRandom還未被初始化
// 以及當前執行緒是第一次進入該函式
if ((h = ThreadLocalRandom.getProbe()) == 0) {
// 初始化ThreadLocalRandom,當前執行緒會被設定一個probe
ThreadLocalRandom.localInit(); // force initialization
// probe用於在CounterCell陣列中定址
h = ThreadLocalRandom.getProbe();
// 未競爭標誌
wasUncontended = true;
}
// 衝突標誌
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// CounterCell陣列已初始化
if ((as = counterCells) != null && (n = as.length) > 0) {
// 如果定址到的Cell為空,那麼建立一個新的Cell
if ((a = as[(n - 1) & h]) == null) {
// cellsBusy是一個只有0和1兩個狀態的volatile整數
// 它被當做一個自旋鎖,0代表無鎖,1代表加鎖
if (cellsBusy == 0) { // Try to attach new Cell
// 將傳入的x作為初始值建立一個新的CounterCell
CounterCell r = new CounterCell(x); // Optimistic create
// 通過CAS嘗試對自旋鎖加鎖
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 加鎖成功,宣告Cell是否建立成功的標誌
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 再次檢查CounterCell陣列是否不為空
// 並且定址到的Cell為空
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 將之前建立的新Cell放入陣列
rs[j] = r;
created = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 如果已經建立成功,中斷迴圈
// 因為新Cell的初始值就是傳入的增量,所以計數已經完畢了
if (created)
break;
// 如果未成功
// 代表as[(n - 1) & h]這個位置的Cell已經被其他執行緒設定
// 那麼就從迴圈頭重新開始
continue; // Slot is now non-empty
}
}
collide = false;
}
// as[(n - 1) & h]非空
// 在addCount()函式中通過CAS更新當前執行緒的Cell進行計數失敗
// 會傳入wasUncontended = false,代表已經有其他執行緒進行競爭
else if (!wasUncontended) // CAS already known to fail
// 設定未競爭標誌,之後會重新計算probe,然後重新執行迴圈
wasUncontended = true; // Continue after rehash
// 嘗試進行計數,如果成功,那麼就退出迴圈
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// 嘗試更新失敗,檢查counterCell陣列是否已經擴容
// 或者容量達到最大值(CPU的數量)
else if (counterCells != as || n >= NCPU)
// 設定衝突標誌,防止跳入下面的擴容分支
// 之後會重新計算probe
collide = false; // At max size or stale
// 設定衝突標誌,重新執行迴圈
// 如果下次迴圈執行到該分支,並且衝突標誌仍然為true
// 那麼會跳過該分支,到下一個分支進行擴容
else if (!collide)
collide = true;
// 嘗試加鎖,然後對counterCells陣列進行擴容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
// 檢查是否已被擴容
if (counterCells == as) {// Expand table unless stale
// 新陣列容量為之前的1倍
CounterCell[] rs = new CounterCell[n << 1];
// 遷移資料到新陣列
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
collide = false;
// 重新執行迴圈
continue; // Retry with expanded table
}
// 為當前執行緒重新計算probe
h = ThreadLocalRandom.advanceProbe(h);
}
// CounterCell陣列未初始化,嘗試獲取自旋鎖,然後進行初始化
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
// 初始化CounterCell陣列,初始容量為2
CounterCell[] rs = new CounterCell[2];
// 初始化CounterCell
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
// 初始化CounterCell陣列成功,退出迴圈
if (init)
break;
}
// 如果自旋鎖被佔用,則只好嘗試更新baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
對於統計總數,只要能夠理解CounterCell的思想,就很簡單了。仔細想一想,每次計數的更新都會被分攤在baseCount和CounterCell陣列中的某一CounterCell,想要獲得總數,把它們統計相加就是了。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
其實size()
函式返回的總數可能並不是百分百精確的,試想如果前一個遍歷過的CounterCell又進行了更新會怎麼樣?儘管只是一個估算值,但在大多數場景下都還能接受,而且效能上是要比Java 7好上太多了。
新增元素
新增元素的主要邏輯與HashMap沒什麼區別,有所區別的複雜操作如擴容和計數我們上文都已經深入解析過了,所以整體來說putVal()
函式還是比較簡單的,可能唯一需要注意的就是在對節點進行操作的時候需要通過互斥鎖保證執行緒安全,這個互斥鎖的粒度很小,只對需要操作的這個bucket加鎖。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0; // 節點計數器,用於判斷是否需要樹化
// 無限迴圈+CAS,無鎖的標準套路
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// bucket為null,通過CAS建立頭節點,如果成功就結束迴圈
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// bucket為ForwardingNode
// 當前執行緒前去協助進行擴容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
// 節點是連結串列
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到目標,設定value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 未找到節點,插入新節點到連結串列尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 節點是紅黑樹
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 根據bucket中的節點數決定是否樹化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// oldVal不等於null,說明沒有新節點
// 所以直接返回,不進行計數
if (oldVal != null)
return oldVal;
break;
}
}
}
// 計數
addCount(1L, binCount);
return null;
}
至於刪除元素的操作位於函式replaceNode(Object key, V value, Object cv)
,當table[key].val
等於期望值cv時(或cv等於null),更新節點的值為value,如果value等於null,那麼刪除該節點。
remove()
函式通過呼叫replaceNode(key, null, null)
來達成刪除目標節點的目的,replaceNode()
的具體實現與putVal()
沒什麼差別,只不過對連結串列的操作有所不同而已,所以就不多敘述了。
平行計算
Java 8除了對ConcurrentHashMap重新設計以外,還引入了基於Lambda表示式的Stream API。它是對集合物件功能上的增強(所以不止ConcurrentHashMap,其他集合也都實現了該API),以一種優雅的方式來批量操作、聚合或遍歷集合中的資料。
最重要的是,它還提供了並行模式,充分利用了多核CPU的優勢實現平行計算。讓我們看看如下的示例程式碼:
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
String keys = "ABCDEFG";
for (int i = 1; i <= keys.length(); i++) {
map.put(String.valueOf(keys.charAt(i - 1)), i);
}
map.forEach(2,
(k, v) -> System.out.println("key-" + k + ":value-" + v + ". by thread->" + Thread.currentThread().getName()));
}
這段程式碼通過兩個執行緒(包括主執行緒)並行地遍歷map中的元素,然後輸出到控制檯,輸出如下
key-A:value-1. by thread->main
key-D:value-4. by thread->ForkJoinPool.commonPool-worker-2
key-B:value-2. by thread->main
key-E:value-5. by thread->ForkJoinPool.commonPool-worker-2
key-C:value-3. by thread->main
key-F:value-6. by thread->ForkJoinPool.commonPool-worker-2
key-G:value-7. by thread->ForkJoinPool.commonPool-worker-2
很明顯,有兩個執行緒在進行工作,那麼這是怎麼實現的呢?我們先來看看forEach()
函式:
public void forEach(long parallelismThreshold,
BiConsumer<? super K,? super V> action) {
if (action == null) throw new NullPointerException();
new ForEachMappingTask<K,V>
(null, batchFor(parallelismThreshold), 0, 0, table,
action).invoke();
}
parallelismThreshold
是需要並行執行該操作的執行緒數量,action
則是回撥函式(我們想要執行的操作)。action
的型別為BiConsumer,是一個用於支援Lambda表示式的FunctionalInterface,它接受兩個輸入引數並返回0個結果。
@FunctionalInterface
public interface BiConsumer<T, U> {
/**
* Performs this operation on the given arguments.
*
* @param t the first input argument
* @param u the second input argument
*/
void accept(T t, U u);
看來實現平行計算的關鍵在於ForEachMappingTask物件,通過它的繼承關係結構圖可以發現,ForEachMappingTask其實就是ForkJoinTask。
集合的平行計算是基於Fork/Join框架實現的,工作執行緒交由ForkJoinPool執行緒池維護。它推崇分而治之的思想,將一個大的任務分解成多個小的任務,通過fork()
函式(有點像Linux的fork()
系統呼叫來建立子程序)來開啟一個工作執行緒執行其中一個小任務,通過join()
函式等待工作執行緒執行完畢(需要等所有工作執行緒執行完畢才能合併最終結果),只要所有的小任務都已經處理完成,就代表這個大的任務也完成了。
像上文中的示例程式碼就是將遍歷這個大任務分解成了N個小任務,然後交由兩個工作執行緒進行處理。
static final class ForEachMappingTask<K,V>
extends BulkTask<K,V,Void> {
final BiConsumer<? super K, ? super V> action;
ForEachMappingTask
(BulkTask<K,V,?> p, int b, int i, int f, Node<K,V>[] t,
BiConsumer<? super K,? super V> action) {
super(p, b, i, f, t);
this.action = action;
}
public final void compute() {
final BiConsumer<? super K, ? super V> action;
if ((action = this.action) != null) {
for (int i = baseIndex, f, h; batch > 0 &&
(h = ((f = baseLimit) + i) >>> 1) > i;) {
// 記錄待完成任務的數量
addToPendingCount(1);
// 開啟一個工作執行緒執行任務
// 其餘引數是任務的區間以及table和回撥函式
new ForEachMappingTask<K,V>
(this, batch >>>= 1, baseLimit = h, f, tab,
action).fork();
}
for (Node<K,V> p; (p = advance()) != null; )
// 呼叫回撥函式
action.accept(p.key, p.val);
// 與addToPendingCount()相反
// 它會減少待完成任務的計數器
// 如果該計數器為0,代表所有任務已經完成了
propagateCompletion();
}
}
}
其他平行計算函式的實現也都差不多,只不過具體的Task實現不同,例如search()
:
public <U> U search(long parallelismThreshold,
BiFunction<? super K, ? super V, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchMappingsTask<K,V,U>
(null, batchFor(parallelismThreshold), 0, 0, table,
searchFunction, new AtomicReference<U>()).invoke();
}