1. 程式人生 > >Java7 ConcurrentHashMap源碼深入解析

Java7 ConcurrentHashMap源碼深入解析

都沒有 oca service != bre sock 可見性 des tor

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支持並發操作,所以要復雜一些。

整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。註意,行文中,我很多地方用了“槽”來代表一個 segment。

簡單理解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是線程安全的,也就實現了全局的線程安全。

技術分享圖片
3
concurrencyLevel:並行級別、並發數、Segment 數,怎麽翻譯不重要,理解它。默認是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支持 16 個線程並發寫,只要它們的操作分別分布在不同的 Segment 上。這個值可以在初始化的時候設置為其他值,但是一旦初始化以後,它是不可以擴容的。

再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證線程安全,所以處理起來要麻煩些。

初始化
initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。

loadFactor:負載因子,之前我們說了,Segment 數組不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)

concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
// 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 我們這裏先不要那麽燒腦,用默認值,concurrencyLevel 為 16,sshift 為 4
// 那麽計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// initialCapacity 是設置整個 map 初始的大小,
// 這裏根據 initialCapacity 計算 Segment 數組中每個位置可以分到的大小
// 如 initialCapacity 為 64,那麽每個 Segment 或稱之為"槽"可以分到 4 個
int c = initialCapacity / ssize;
if (c ssize < initialCapacity)
++c;
// 默認 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,
// 插入一個元素不至於擴容,插入第二個的時候才會擴容
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 創建 Segment 數組,
// 並創建數組的第一個元素 segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap
loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 往數組寫入 segment[0]
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
初始化完成,我們得到了一個 Segment 數組。

我們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麽初始化完成後:

Segment 數組長度為 16,不可以擴容

Segment[i] 的默認大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容

這裏初始化了 segment[0],其他位置還是 null,至於為什麽要初始化 segment[0],後面的代碼會介紹

當前 segmentShift 的值為 32 - 4 = 28,segmentMask 為 16 - 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

put 過程分析
我們先看 put 的主流程,對於其中的一些關鍵細節操作,後面會進行詳細介紹。

public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 計算 key 的 hash 值
int hash = hash(key);
// 2. 根據 hash 值找到 Segment 數組中的位置 j
// hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,
// 然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的數組下標
int j = (hash >>> segmentShift) & segmentMask;
// 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
// ensureSegment(j) 對 segment[j] 進行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。

Segment 內部是由 數組+鏈表 組成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往該 segment 寫入前,需要先獲取該 segment 的獨占鎖
// 先看主流程,後面還會具體介紹這部分內容
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 這個是 segment 內部的數組
HashEntry<K,V>[] tab = table;
// 再利用 hash 值,求應該放置的數組下標
int index = (tab.length - 1) & hash;
// first 是數組該位置處的鏈表的表頭
HashEntry<K,V> first = entryAt(tab, index);
// 下面這串 for 循環雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個鏈表這兩種情況
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆蓋舊值
e.value = value;
++modCount;
}
break;
}
// 繼續順著鏈表走
e = e.next;
}
else {
// node 到底是不是 null,這個要看獲取鎖的過程,不過和這裏都沒有關系。
// 如果不為 null,那就直接將它設置為鏈表表頭;如果是null,初始化並設置為鏈表表頭。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超過了該 segment 的閾值,這個 segment 需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 擴容後面也會具體分析
else
// 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
// 其實就是將新的節點設置成原鏈表的表頭
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
}
整體流程還是比較簡單的,由於有獨占鎖的保護,所以 segment 內部的操作並不復雜。至於這裏面的並發問題,我們稍後再進行介紹。

到這裏 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。

初始化槽: ensureSegment
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。

這裏需要考慮並發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。

private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 這裏看到為什麽之前要初始化 segment[0] 了,
// 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k]
// 為什麽要用“當前”,因為 segment[0] 可能早就擴容過了
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 內部的數組
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次檢查一遍該槽是否被其他線程初始化了。
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 使用 while 循環,內部用 CAS,當前線程成功設值或其他線程成功設值後,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
總的來說,ensureSegment(int k) 比較簡單,對於並發操作使用 CAS 進行控制。

我沒搞懂這裏為什麽要搞一個 while 循環,CAS 失敗不就代表有其他線程成功了嗎,為什麽要再進行判斷?

獲取寫入鎖: scanAndLockForPut
前面我們看到,在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨占鎖,如果失敗,那麽進入到 scanAndLockForPut 這個方法來獲取鎖。

下面我們來具體分析這個方法中是怎麽控制加鎖的。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循環獲取鎖
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 進到這裏說明數組該位置的鏈表是空的,沒有任何元素
// 當然,進到這裏的另一個原因是 tryLock() 失敗,所以該槽存在並發,不一定是該位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 順著鏈表往下走
e = e.next;
}
// 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麽不搶了,進入到阻塞隊列等待鎖
// lock() 是阻塞方法,直到獲取鎖後返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 這個時候是有大問題了,那就是有新的元素進到了鏈表,成為了新的表頭
// 所以這邊的策略是,相當於重新走一遍這個 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨占鎖。

這個方法就是看似復雜,但是其實就是做了一件事,那就是獲取該 segment 的獨占鎖,如果需要的話順便實例化了一下 node。

擴容: rehash
重復一下,segment 數組不能擴容,擴容是 segment 數組某個位置內部的數組 HashEntry[] 進行擴容,擴容後,容量為原來的 2 倍。

首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那麽先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。

該方法不需要考慮並發,因為到這裏的時候,是持有該 segment 的獨占鎖的 。

// 方法參數上的 node 是這次擴容後,需要添加到新的數組中的數據。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 創建新數組
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩碼,如從 16 擴容到 32,那麽 sizeMask 為 31,對應二進制 ‘000...00011111’
int sizeMask = newCapacity - 1;
// 遍歷原數組,老套路,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是鏈表的第一個元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 計算應該放置在新數組中的位置,
// 假設原數組長度為 16,e 在 oldTable[3] 處,那麽 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 該位置處只有一個元素,那比較好辦
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是鏈表表頭
HashEntry<K,V> lastRun = e;
// idx 是當前鏈表的頭結點 e 的新位置
int lastIdx = idx;
// 下面這個 for 循環會找到一個 lastRun 節點,這個節點之後的所有元素是將要放到一起的
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 將 lastRun 及其之後的所有節點組成的這個鏈表放到 lastIdx 這個位置
newTable[lastIdx] = lastRun;
// 下面的操作是處理 lastRun 之前的節點,
// 這些節點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
這裏的擴容比之前的 HashMap 要復雜一些,代碼難懂一點。上面有兩個挨著的 for 循環,第一個 for 有什麽用呢?

仔細一看發現,如果沒有第一個 for 循環,也是可以工作的,但是,這個 for 循環下來,如果 lastRun 的後面還有比較多的節點,那麽這次就是值得的。因為我們只需要克隆 lastRun 前面的節點,後面的一串節點跟著 lastRun 走就是了,不需要做任何操作。

我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是鏈表的最後一個元素或者很靠後的元素,那麽這次遍歷就有點浪費了。 不過 Doug Lea 也說了,根據統計,如果使用默認的閾值,大約只有 1/6 的節點需要克隆。

get 過程分析
相對於 put 來說,get 真的不要太簡單。

計算 hash 值,找到 segment 數組中的具體位置,或我們前面用的“槽”

槽中也是一個數組,根據 hash 找到數組中具體的位置

到這裏是鏈表了,順著鏈表進行查找即可

public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 1. hash 值
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 2. 根據 hash 找到對應的 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 3. 找到segment 內部數組相應位置的鏈表,遍歷
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
並發問題分析
現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮並發問題。

添加節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨占鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。

1、put 操作的線程安全性。

初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的數組。

添加節點到鏈表的操作是插入到表頭的,所以,如果這個時候 get 操作在鏈表遍歷的過程已經到了中間,是不會影響的。當然,另一個並發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。

擴容。擴容是新創建了數組,然後進行遷移數據,最後面將 newTable 設置給屬性 table。所以,如果 get 操作此時也在進行,那麽也沒關系,如果 get 先行,那麽就是在舊的 table 上做查詢操作;而 put 先行,那麽 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。

2、remove 操作的線程安全性。

remove 操作我們沒有分析源碼,所以這裏說的讀者感興趣的話還是需要到源碼中去求實一下的。

get 操作需要遍歷鏈表,但是 remove 操作會"破壞"鏈表。

如果 remove 破壞的節點 get 操作已經過去了,那麽這裏不存在任何問題。

如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麽需要將頭結點的 next 設置為數組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 並不能提供數組內部操作的可見性保證,所以源碼中使用了 UNSAFE 來操作數組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裏的並發保證就是 next 屬性是 volatile 的。

出處:https://www.javadoop.com/post/hashmap#Java7%20ConcurrentHashMap

每日一題
技術分享圖片

? 在一個線程中 Sleep ( 1000 )方法,將使得該線程在多少時間後獲得對 CPU 的控制(假設 睡眠 過程中不會有其他事件喚醒該線程)? C

A . 正好 1000 毫秒

B . 1000 毫秒不到

C . =>1000 毫秒

D. 不一定

? 說出 Servlet 的生命周期,並說出 Servlet 和 CGI 的區別

Servlet被服務器實例化後,容器運行其init方法,請求到達時運行其service方法,service方法自動派遣運行與請求對應的doXXX方法(doGet,doPost)等,當服務器決定將實例銷毀的時候調用其destroy方法。
與cgi的區別在於servlet處於服務器進程中,它通過多線程方式運行其service方法,一個實例可以服務於多個請求,並且其實例一般不會銷毀,而CGI對每個請求都產生新的進程,服務完成後就銷毀,所以效率上低於servlet

? 用 socket 通訊寫出客戶端和服務器端的通訊,要求客戶發送數據後能夠回顯相同的數據。
Server.java: 源代碼

import java.net.*;

import java.io.*;

class Server

{

public Server()

{

BufferedReader br = null;

PrintWriter pw = null;

try

{

ServerSocket server = new ServerSocket(8888); // 建立服務器端

Socket socket = server.accept(); // 監聽客戶端

// 得到該連接的輸入流

br = new BufferedReader(new InputStreamReader(socket.getInputStream()));

// 得到該連接的輸出流

pw = new PrintWriter(socket.getOutputStream(),true);

// 先讀後寫

String data = br.readLine();

System.out.println(data); // 輸出到控制臺

pw.println(data); // 轉發給客戶端

}catch(Exception e)

{

e.printStackTrace();

}

finally

{

try

{

// 關閉讀寫流

br.close();

pw.close();

}catch(Exception e)

{}

}

}

public static void main(String[] args)

{

Server server = new Server();

}

}

Client.java: 源代碼

import java.net.*;

import java.io.*;

class Client

{

public Client()

{

BufferedReader br = null;

PrintWriter pw = null;

try

{

Socket socket = new Socket("localhost",8888); // 與服務器建立連接,服務器要先啟

// 得到 Socket 的輸入與輸出流

br = new BufferedReader(new InputStreamReader(socket.getInputStream()));

pw = new PrintWriter(socket.getOutputStream(),true);

// 先寫後讀

pw.println("Client: 你好! ");

String data = null;

while(true)

{

data = br.readLine();

if(data!=null) break;

}

System.out.println(data);

}catch(Exception e)

{

e.printStackTrace();

}

finally

{

try

{

br.close();

pw.close();

}catch(Exception e)

{}

}

}

public static void main(String[] args)

{

Client c = new Client();

}

}

Java7 ConcurrentHashMap源碼深入解析