ConcurrentHashMap JDK1.7和JDK1.8區別
前言
以前寫過介紹HashMap的文章,文中提到過HashMap在put的時候,插入的元素超過了容量(由負載因子決定)的範圍就會觸發擴容操作,就是rehash,這個會重新將原陣列的內容重新hash到新的擴容陣列中,在多執行緒的環境下,存在同時其他的元素也在進行put操作,如果hash值相同,可能出現同時在同一陣列下用連結串列表示,造成閉環,導致在get時會出現死迴圈,所以HashMap是執行緒不安全的。
我們來了解另一個鍵值儲存集合HashTable,它是執行緒安全的,它在所有涉及到多執行緒操作的都加上了synchronized關鍵字來鎖住整個table,這就意味著所有的執行緒都在競爭一把鎖,在多執行緒的環境下,它是安全的,但是無疑是效率低下的。
其實HashTable有很多的優化空間,鎖住整個table這麼粗暴的方法可以變相的柔和點,比如在多執行緒的環境下,對不同的資料集進行操作時其實根本就不需要去競爭一個鎖,因為他們不同hash值,不會因為rehash造成執行緒不安全,所以互不影響,這就是鎖分離技術,將鎖的粒度降低,利用多個鎖來控制多個小的table,這就是這篇文章的主角ConcurrentHashMap JDK1.7版本的核心思想
ConcurrentHashMap
JDK1.7的實現
在JDK1.7版本中,ConcurrentHashMap的資料結構是由一個Segment陣列和多個HashEntry組成,如下圖所示:
Segment陣列的意義就是將一個大的table分割成多個小的table來進行加鎖,也就是上面的提到的鎖分離技術,而每一個Segment元素儲存的是HashEntry陣列+連結串列,這個和HashMap的資料儲存結構一樣
初始化
ConcurrentHashMap的初始化是會通過位與運算來初始化Segment的大小,用ssize來表示,如下所示
-
int sshift =
0;
-
int
ssize =
1;
-
while (ssize < concurrencyLevel) {
-
++sshift;
-
ssize <<=
1;
-
}
如上所示,因為ssize用位於運算來計算(ssize <<=1
),所以Segment的大小取值都是以2的N次方,無關concurrencyLevel的取值,當然concurrencyLevel最大隻能用16位的二進位制來表示,即65536,換句話說,Segment的大小最多65536個,沒有指定concurrencyLevel元素初始化,Segment的大小ssize預設為16
每一個Segment元素下的HashEntry的初始化也是按照位於運算來計算,用cap來表示,如下所示
-
int cap =
1;
-
while (cap < c)
-
cap <<=
1;
如上所示,HashEntry大小的計算也是2的N次方(cap <<=1), cap的初始值為1,所以HashEntry最小的容量為2
put操作
對於ConcurrentHashMap的資料插入,這裡要進行兩次Hash去定位資料的儲存位置
static class Segment<K,V> extends ReentrantLock implements Serializable {
從上Segment的繼承體系可以看出,Segment實現了ReentrantLock,也就帶有鎖的功能,當執行put操作時,會進行第一次key的hash來定位Segment的位置,如果該Segment還沒有初始化,即通過CAS操作進行賦值,然後進行第二次hash操作,找到相應的HashEntry的位置,這裡會利用繼承過來的鎖的特性,在將資料插入指定的HashEntry位置時(連結串列的尾端),會通過繼承ReentrantLock的tryLock()方法嘗試去獲取鎖,如果獲取成功就直接插入相應的位置,如果已經有執行緒獲取該Segment的鎖,那當前執行緒會以自旋的方式去繼續的呼叫tryLock()方法去獲取鎖,超過指定次數就掛起,等待喚醒
get操作
ConcurrentHashMap的get操作跟HashMap類似,只是ConcurrentHashMap第一次需要經過一次hash定位到Segment的位置,然後再hash定位到指定的HashEntry,遍歷該HashEntry下的連結串列進行對比,成功就返回,不成功就返回null
size操作
計算ConcurrentHashMap的元素大小是一個有趣的問題,因為他是併發操作的,就是在你計算size的時候,他還在併發的插入資料,可能會導致你計算出來的size和你實際的size有相差(在你return size的時候,插入了多個數據),要解決這個問題,JDK1.7版本用兩種方案
-
try {
-
for (;;) {
-
if (retries++ == RETRIES_BEFORE_LOCK) {
-
for (
int j =
0; j < segments.length; ++j) ensureSegment(j).lock();
// force creation
-
}
-
sum =
0L;
-
size =
0;
-
overflow =
false;
-
for (
int j =
0; j < segments.length; ++j) {
-
Segment<K,V> seg = segmentAt(segments, j);
-
if (seg !=
null) { sum += seg.modCount;
int c = seg.count;
if (c <
0 || (size += c) <
0)
-
overflow =
true;
-
} }
-
if (sum == last)
break;
-
last = sum; } }
-
finally {
-
if (retries > RETRIES_BEFORE_LOCK) {
-
for (
int j =
0; j < segments.length; ++j)
-
segmentAt(segments, j).unlock();
-
}
-
}
1.第一種方案他會使用不加鎖的模式去嘗試多次計算ConcurrentHashMap的size,最多三次,比較前後兩次計算的結果,結果一致就認為當前沒有元素加入,計算的結果是準確的
2.第二種方案是如果第一種方案不符合,他就會給每個Segment加上鎖,然後計算ConcurrentHashMap的size返回
JDK1.8的實現
JDK1.8的實現已經摒棄了Segment的概念,而是直接用Node陣列+連結串列+紅黑樹的資料結構來實現,併發控制使用Synchronized和CAS來操作,整個看起來就像是優化過且執行緒安全的HashMap,雖然在JDK1.8中還能看到Segment的資料結構,但是已經簡化了屬性,只是為了相容舊版本
在深入JDK1.8的put和get實現之前要知道一些常量設計和資料結構,這些是構成ConcurrentHashMap實現結構的基礎,下面看一下基本屬性:
-
// node陣列最大容量:2^30=1073741824
-
private
static
final
int MAXIMUM_CAPACITY =
1 <<
30;
-
// 預設初始值,必須是2的幕數
-
private
static
final
int DEFAULT_CAPACITY =
16;
-
//陣列可能最大值,需要與toArray()相關方法關聯
-
static
final
int MAX_ARRAY_SIZE = Integer.MAX_VALUE -
8;
-
//併發級別,遺留下來的,為相容以前的版本
-
private
static
final
int DEFAULT_CONCURRENCY_LEVEL =
16;
-
// 負載因子
-
private
static
final
float LOAD_FACTOR =
0.75f;
-
// 連結串列轉紅黑樹閥值,> 8 連結串列轉換為紅黑樹
-
static
final
int TREEIFY_THRESHOLD =
8;
-
//樹轉連結串列閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
-
static
final
int UNTREEIFY_THRESHOLD =
6;
-
static
final
int MIN_TREEIFY_CAPACITY =
64;
-
private
static
final
int MIN_TRANSFER_STRIDE =
16;
-
private
static
int RESIZE_STAMP_BITS =
16;
-
// 2^15-1,help resize的最大執行緒數
-
private
static
final
int MAX_RESIZERS = (
1 << (
32 - RESIZE_STAMP_BITS)) -
1;
-
// 32-16=16,sizeCtl中記錄size大小的偏移量
-
private
static
final
int RESIZE_STAMP_SHIFT =
32 - RESIZE_STAMP_BITS;
-
// forwarding nodes的hash值
-
static
final
int MOVED = -
1;
-
// 樹根節點的hash值
-
static
final
int TREEBIN = -
2;
-
// ReservationNode的hash值
-
static
final
int RESERVED = -
3;
-
// 可用處理器數量
-
static
final
int NCPU = Runtime.getRuntime().availableProcessors();
-
//存放node的陣列
-
transient
volatile Node<K,V>[] table;
-
/*控制識別符號,用來控制table的初始化和擴容的操作,不同的值有不同的含義
-
*當為負數時:-1代表正在初始化,-N代表有N-1個執行緒正在 進行擴容
-
*當為0時:代表當時的table還沒有被初始化
-
*當為正數時:表示初始化或者下一次進行擴容的大小*/
-
private
transient
volatile
int sizeCtl;
基本屬性定義了ConcurrentHashMap的一些邊界以及操作時的一些控制,下面看一些內部的一些結構組成,這些是整個ConcurrentHashMap整個資料結構的核心
Node
Node是ConcurrentHashMap儲存結構的基本單元,繼承於HashMap中的Entry,用於儲存資料,原始碼如下
-
static
class Node<K,V> implements Map.Entry<K,V> {
-
//連結串列的資料結構
-
final
int hash;
-
final K key;
-
//val和next都會在擴容時發生變化,所以加上volatile來保持可見性和禁止重排序
-
volatile V val;
-
volatile Node<K,V> next;
-
Node(
int hash, K key, V val, Node<K,V> next) {
-
this.hash = hash;
-
this.key = key;
-
this.val = val;
-
this.next = next;
-
}
-
public final K getKey() {
return key; }
-
public final V getValue() {
return val; }
-
public final int hashCode() {
return key.hashCode() ^ val.hashCode(); }
-
public final String toString(){
return key +
"=" + val; }
-
//不允許更新value
-
public final V setValue(V value) {
-
throw
new UnsupportedOperationException();
-
}
-
public final boolean equals(Object o) {
-
Object k, v, u; Map.Entry<?,?> e;
-
return ((o
instanceof Map.Entry) &&
-
(k = (e = (Map.Entry<?,?>)o).getKey()) !=
null &&
-
(v = e.getValue()) !=
null &&
-
(k == key || k.equals(key)) &&
-
(v == (u = val) || v.equals(u)));
-
}
-
//用於map中的get()方法,子類重寫
-
Node<K,V> find(int h, Object k) {
-
Node<K,V> e =
this;
-
if (k !=
null) {
-
do {
-
K ek;
-
if (e.hash == h &&
-
((ek = e.key) == k || (ek !=
null && k.equals(ek))))
-
return e;
-
}
while ((e = e.next) !=
null);
-
}
-
return
null;
-
}
-
}
Node資料結構很簡單,從上可知,就是一個連結串列,但是隻允許對資料進行查詢,不允許進行修改
TreeNode
TreeNode繼承與Node,但是資料結構換成了二叉樹結構,它是紅黑樹的資料的儲存結構,用於紅黑樹中儲存資料,當連結串列的節點數大於8時會轉換成紅黑樹的結構,他就是通過TreeNode作為儲存結構代替Node來轉換成黑紅樹原始碼如下
-
static
final
class TreeNode<K,V> extends Node<K,V> {
-
//樹形結構的屬性定義
-
TreeNode<K,V> parent;
// red-black tree links
-
TreeNode<K,V> left;
-
TreeNode<K,V> right;
-
TreeNode<K,V> prev;
// needed to unlink next upon deletion
-
boolean red;
//標誌紅黑樹的紅節點
-
TreeNode(
int hash, K key, V val, Node<K,V> next,
-
TreeNode<K,V> parent) {
-
super(hash, key, val, next);
-
this.parent = parent;
-
}
-
Node<K,V> find(int h, Object k) {
-
return findTreeNode(h, k,
null);
-
}
-
//根據key查詢 從根節點開始找出相應的TreeNode,
-
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
-
if (k !=
null) {
-
TreeNode<K,V> p =
this;
-
do {
-
int ph, dir; K pk; TreeNode<K,V> q;
-
TreeNode<K,V> pl = p.left, pr = p.right;
-
if ((ph = p.hash) > h)
-
p = pl;
-
else
if (ph < h)
-
p = pr;
-
else
if ((pk = p.key) == k || (pk !=
null && k.equals(pk)))
-
return p;
-
else
if (pl ==
null)
-
p = pr;
-
else
if (pr ==
null)
-
p = pl;
-
else
if ((kc !=
null ||
-
(kc = comparableClassFor(k)) !=
null) &&
-
(dir = compareComparables(kc, k, pk)) !=
0)
-
p = (dir <
0) ? pl : pr;
-
else
if ((q = pr.findTreeNode(h, k, kc)) !=
null)
-
return q;
-
else
-
p = pl;
-
}
while (p !=
null);
-
}
-
return
null;
-
}
-
}
TreeBin
TreeBin從字面含義中可以理解為儲存樹形結構的容器,而樹形結構就是指TreeNode,所以TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制,部分原始碼結構如下
-
static
final
class TreeBin<K,V> extends Node<K,V> {
-
//指向TreeNode列表和根節點
-
TreeNode<K,V> root;
-
volatile TreeNode<K,V> first;
-
volatile Thread waiter;
-
volatile
int lockState;
-
// 讀寫鎖狀態
-
static
final
int WRITER =
1;
// 獲取寫鎖的狀態
-
static
final
int WAITER =
2;
// 等待寫鎖的狀態
-
static
final
int READER =
4;
// 增加資料時讀鎖的狀態
-
/**
-
* 初始化紅黑樹
-
*/
-
TreeBin(TreeNode<K,V> b) {
-
super(TREEBIN,
null,
null,
null);
-
this.first = b;
-
TreeNode<K,V> r =
null;
-
for (TreeNode<K,V> x = b, next; x !=
null; x = next) {
-
next = (TreeNode<K,V>)x.next;
-
x.left = x.right =
null;
-
if (r ==
null) {
-
x.parent =
null;
-
x.red =
false;
-
r = x;
-
}
-
else {
-
K k = x.key;
-