Java併發程式設計與技術內幕:ConcurrentHashMap原始碼解析
摘要:本文主要講了Java中ConcurrentHashMap 的原始碼
ConcurrentHashMap 是java併發包中非常有用的一個類,在高併發場景用得非常多,它是執行緒安全的。要注意到雖然HashTable雖然也是執行緒安全的,但是它的效能在高併發場景下完全比不上ConcurrentHashMap ,這也是由它們的結構所決定的,可以認為ConcurrentHashMap 是HashTable的加強版,不過這加強版和原來的HashTable有非常大的區別,不僅是在結構上,而且在方法上也有差別。
下面先來簡單說說它們的區別吧HashMap、 HashTable、ConcurrentHashMap 的區別吧
1、HashMap是非執行緒安全,HashTable、ConcurrentHashMap 都是執行緒安全,而且ConcurrentHashMap 、Hashtable不能傳入nul的key或value,HashMap可以。
2、Hashtable是將資料放入到一個Entrty陣列或者它Entrty陣列上一個Entrty的連結串列節點。而ConcurrentHashMap 是由Segment陣列組成,每一個Segment可以看成是一個單獨的Map.然後每個Segment裡又有一個HashEntrty陣列用來存放資料。
3、HashTable的get/put/remove方法都是基於同步的synchronized方法,而ConcurrentHashMap 是基本鎖的機制,並且每次不是鎖全表,而是鎖單獨的一個Segment。所以ConcurrentHashMap 的效能比HashTable好。
4、如果不考慮執行緒安全因素,推薦使用HashMap,因為它效能最好。
首先來看看它包含的結構吧!
這裡看到了一個Segment<K,V>[] segments;陣列。下面再來看看Segment這個類。在下面可以看到Segment這個類繼承了ReentrantLock鎖。所以它也是一個鎖。然後它裡面還有一個HashEntry<K,V>[] table。這是真正用來存放資料的結構。public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { private static final long serialVersionUID = 7249069246763182397L; //預設陣列容量大小 static final int DEFAULT_INITIAL_CAPACITY = 16; //預設裝載因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //預設層級 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大的每一個數組容量大小 static final int MAXIMUM_CAPACITY = 1 << 30; //最大的分組數目 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //呼叫remove/contain/replace方法時不加鎖的情況下操作重試次數 static final int RETRIES_BEFORE_LOCK = 2; //segments 陣列索引相關 final int segmentMask; //segments 陣列偏移相關 final int segmentShift; //segments陣列,每個segments單獨就可以認為是一個map final Segment<K,V>[] segments;
/**
* Segment內部類,注意它也是一個鎖!可以認為它是一個帶有鎖方法的map
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
//元素個數
transient volatile int count;
//修改次數
transient int modCount;
//閾值,超過這個數會重新reSize
transient int threshold;
//注意,這裡又一個數組,這個是真正存放資料元素的地方
transient volatile HashEntry<K,V>[] table;
//裝載因子,用來計算threshold
final float loadFactor;
HashEntry它的結構很簡單:
static final class HashEntry<K,V> {
final K key;
final int hash;//用來儲存Segment索引的資訊
volatile V value;
final HashEntry<K,V> next;
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}
經過上面的分析:可以得出如下的ConcurrentHashMap結構圖
全部原始碼分析:
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;
import java.io.Serializable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
//預設陣列容量大小
static final int DEFAULT_INITIAL_CAPACITY = 16;
//預設裝載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//預設層級
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大的每一個數組容量大小
static final int MAXIMUM_CAPACITY = 1 << 30;
//最大的分組數目
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
//呼叫remove/contain/replace方法時不加鎖的情況下操作重試次數
static final int RETRIES_BEFORE_LOCK = 2;
//segments 陣列索引相關
final int segmentMask;
//segments 陣列偏移相關
final int segmentShift;
//segments陣列,每個segments單獨就可以認為是一個map
final Segment<K,V>[] segments;
/**
* 雜湊演算法
*/
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
/**
* 根據雜湊值計算應該落在哪個segments上
*/
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
/**
* 內部類,每個HashEntry都會存入到一個Segment中去
*/
static final class HashEntry<K,V> {
final K key;//關鍵字
final int hash;//雜湊值
volatile V value;//值
final HashEntry<K,V> next;//不同的關鍵字,相再的雜湊值時會組成 一個連結串列
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}
/**
* Segment內部類,注意它也是一個鎖!可以認為它是一個帶有鎖方法的map
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
//元素個數
transient volatile int count;
//修改次數
transient int modCount;
//閾值,超過這個數會重新reSize
transient int threshold;
//注意,這裡又一個數組,這個是真正存放資料元素的地方
transient volatile HashEntry<K,V>[] table;
//裝載因子,用來計算threshold
final float loadFactor;
//建構函式,由initialCapacity確定table的大小
Segment(int initialCapacity, float lf) {
loadFactor = lf;
setTable(HashEntry.<K,V>newArray(initialCapacity));
}
@SuppressWarnings("unchecked")
static final <K,V> Segment<K,V>[] newArray(int i) {
return new Segment[i];
}
//設定threshold、table
void setTable(HashEntry<K,V>[] newTable) {
threshold = (int)(newTable.length * loadFactor);//注意,當table的元素個數超過這個時,會觸發reSize;
table = newTable;
}
//取得頭一個
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}
//在加鎖情況下讀資料,注意這個類繼續了鎖的方法
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
//取元素
V get(Object key, int hash) {
if (count != 0) { //注意,沒有加鎖
HashEntry<K,V> e = getFirst(hash);//取得頭一個
while (e != null) { //依次從table中取出元素判斷
if (e.hash == hash && key.equals(e.key)) { //hash和key同時相等才表示存在
V v = e.value;
if (v != null) //有可能在這裡時,運行了刪除元素導致為Null,一般發生比較少
return v;
return readValueUnderLock(e); // 重新在加鎖情況下讀資料
}
e = e.next;
}
}
return null;
}
//是否包含一個元素
boolean containsKey(Object key, int hash) {
if (count != 0) { //
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key))
return true;
e = e.next;
}
}
return false;
}
//是否包含一個元素
boolean containsValue(Object value) {
if (count != 0) { // read-volatile
HashEntry<K,V>[] tab = table;
int len = tab.length;
for (int i = 0 ; i < len; i++) {
for (HashEntry<K,V> e = tab[i]; e != null; e = e.next) { //table陣列迴圈讀數
V v = e.value;
if (v == null) // recheck
v = readValueUnderLock(e);
if (value.equals(v))
return true;
}
}
}
return false;
}
//替換時要加鎖
boolean replace(K key, int hash, V oldValue, V newValue) {
lock();
try {
HashEntry<K,V> e = getFirst(hash);
while (e != null && (e.hash != hash || !key.equals(e.key)))//hash和key要同時相等才表示是找到了這個元素
e = e.next;
boolean replaced = false;
if (e != null && oldValue.equals(e.value)) { //判斷是否要進行替換
replaced = true;
e.value = newValue;
}
return replaced;
} finally {
unlock();
}
}
//替換時要加鎖
V replace(K key, int hash, V newValue) {
lock();
try {
HashEntry<K,V> e = getFirst(hash);
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
oldValue = e.value;
e.value = newValue;
}
return oldValue;
} finally {
unlock();
}
}
//放入一個元素,onlyIfAbsent如果有false表示替換原來的舊值
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // table數組裡的元素超過threshold。觸發rehash,其實也就是擴大table
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];//頭一個
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))//一直不斷判斷不重複才停止
e = e.next;
V oldValue;
if (e != null) { //這個key、hash已經存在,修改原來的
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value; //替換原來的舊值
}
else { //這個key、hash已經不存在,加入一個新的
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);//加入一個新的元素
count = c; // 個數變化
}
return oldValue;
} finally {
unlock();
}
}
//重新雜湊
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;//舊容量
if (oldCapacity >= MAXIMUM_CAPACITY) //超過預設的最大容量時就退出了
return;
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);//這裡直接在原來的基礎上擴大1倍
threshold = (int)(newTable.length * loadFactor);//重新計算新的閾值
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) { //下面要做的就是將舊的table上的資料拷貝到新的table
HashEntry<K,V> e = oldTable[i];
if (e != null) { //舊table上該處有資料
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
// 單個節點key-value
if (next == null)
newTable[idx] = e;
else { //連結串列節點key-value
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) { //這裡重新計算了連結串列上最後一個節點的位置
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;//將原table上對應的連結串列上的最後一個元素放在新table對應連結串列的首位置
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { //for迴圈依次拷貝連結串列上的資料,注意最後整個連結串列相對原來會倒序排列
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);//新table資料賦值
}
}
}
}
table = newTable;
}
//刪除一個元素
V remove(Object key, int hash, Object value) {
lock(); //刪除要加鎖
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) { ///找到元素
V v = e.value;
if (value == null || value.equals(v)) { //為null或者value相等時才刪除
oldValue = v;
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);//注意它這裡會倒換原來連結串列的位置
tab[index] = newFirst;
count = c; //記錄數減去1
}
}
return oldValue;
} finally {
unlock();
}
}
//清空整個map
void clear() {
if (count != 0) {
lock();
try {
HashEntry<K,V>[] tab = table;
for (int i = 0; i < tab.length ; i++)
tab[i] = null;//直接賦值為null
++modCount;
count = 0; // write-volatile
} finally {
unlock();
}
}
}
}
//建構函式
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) //loadFactor和initialCapacity都得大於0
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);//為每個segments初始化其裡面的陣列
}
//建構函式
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
//建構函式
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//預設建構函式
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//建構函式
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
//判斷 是否為空
public boolean isEmpty() {
final Segment<K,V>[] segments = this.segments;//取得整個Segment陣列
int[] mc = new int[segments.length];
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
if (segments[i].count != 0) //有一個segments陣列元素個數不為0,那麼整個map肯定不為空
return false;
else
mcsum += mc[i] = segments[i].modCount;//累加總的修改次數
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
if (segments[i].count != 0 ||
mc[i] != segments[i].modCount)//這裡又做了一次mc[i] != segments[i].modCount判斷,因為segments[i].count = 0時才會跳到這裡,不相等那麼肯定是又有元素加入
return false;
}
}
return true;
}
//整個mapr的大小,這裡的大小指的是存放元素的個數
public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
//這是裡的for迴圈是嘗試在不加鎖的情況下來獲取整個map的元素個數
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { //這裡RETRIES_BEFORE_LOCK=2,最大會做兩次的迴圈
check = 0;
sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;//累加每一個segments上的count
mcsum += mc[i] = segments[i].modCount;//累加每一個segments上的modCount
}
if (mcsum != 0) { //修改次數不為0,要再做一次判斷前後兩次的modCount,count的累加
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) { //前後兩次資料發生了變化
check = -1; // 前後兩次取的個數不一到,注意sum還是之前的
break;
}
}
}
if (check == sum) //前後兩次取的元素個數一樣,直接跳出迴圈
break;
}
//這裡會嘗試在加鎖的情況下來獲取整個map的元素個數
if (check != sum) { // 這裡一般check會等於-1才發生
sum = 0;//重新置0
for (int i = 0; i < segments.length; ++i)
segments[i].lock();//每一個segments上鎖
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;//重新累加
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();//依次釋放鎖
}
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;//如果大於最大值,返回最大值
else
return (int)sum;
}
//取得一個元素,先是不加鎖情況下去讀
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);//具體看上面的程式碼註釋
}
//是否包含一個元素,根據key來獲取
public boolean containsKey(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).containsKey(key, hash);
}
//是否包含一個元素,根據value來獲取
public boolean containsValue(Object value) {
if (value == null)
throw new NullPointerException();
//取得整個Segment的內容
final Segment<K,V>[] segments = this.segments;
int[] mc = new int[segments.length];
// 嘗試在不加鎖的情況下做判斷
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { //RETRIES_BEFORE_LOCK這裡=2
int sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
int c = segments[i].count;//累加個數
mcsum += mc[i] = segments[i].modCount;//累加修改次數
if (segments[i].containsValue(value)) //判斷是否包含
return true;
}
boolean cleanSweep = true;
if (mcsum != 0) { //成立說明發生了改變
for (int i = 0; i < segments.length; ++i) { //再迴圈一次取得segments
int c = segments[i].count;//累加第二次迴圈得到的count
if (mc[i] != segments[i].modCount) { //如果有一個segments前後兩次不一樣,那麼它的元素肯定發生了變化
cleanSweep = false;//
break;//跳出
}
}
}
if (cleanSweep) //為ture表示經過上面的兩次判斷還是無法找到
return false;
}
// cleanSweepo為false時,進行下面。注意,這裡是在加鎖情況下
for (int i = 0; i < segments.length; ++i)
segments[i].lock();//取得每一個segments的鎖
boolean found = false;
try {
for (int i = 0; i < segments.length; ++i) { //每個segments取出來做判斷
if (segments[i].containsValue(value)) {
found = true;
break;
}
}
} finally {
for (int i = 0; i < segments.length; ++i) //依次釋放segments的鎖
segments[i].unlock();
}
return found;
}
//是否包含
public boolean contains(Object value) {
return containsValue(value);
}
//放入一個元素
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);//放入時先根據key的hash值找到存放 的segments,再呼叫其put方法
}
//放入一個元素,如果key或value為null,那麼為招出一個異常
public V putIfAbsent(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, true);
}
//放入一個map
public void putAll(Map<? extends K, ? extends V> m) {
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) //遍歷entrySet取出再放入
put(e.getKey(), e.getValue());
}
//刪除一個元素,根據key
public V remove(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).remove(key, hash, null);//根據key的hash值找到存放的segments,再呼叫其remove方法
}
//刪除一個元素,根據key-value
public boolean remove(Object key, Object value) {
int hash = hash(key.hashCode());
if (value == null)
return false;
return segmentFor(hash).remove(key, hash, value) != null;//根據key的hash值找到存放的segments,再呼叫其remove方法
}
//替換元素
public boolean replace(K key, V oldValue, V newValue) {
if (oldValue == null || newValue == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).replace(key, hash, oldValue, newValue);//根據key的hash值找到存放的segments,再呼叫其replace方法
}
//替換元素
public V replace(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).replace(key, hash, value);
}
//清空
public void clear() {
for (int i = 0; i < segments.length; ++i)
segments[i].clear();
}
//序列化方法
private void writeObject(java.io.ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
for (int k = 0; k < segments.length; ++k) {
Segment<K,V> seg = segments[k];
seg.lock();//注意加鎖了
try {
HashEntry<K,V>[] tab = seg.table;
for (int i = 0; i < tab.length; ++i) {
for (HashEntry<K,V> e = tab[i]; e != null; e = e.next) {
s.writeObject(e.key);
s.writeObject(e.value);
}
}
} finally {
seg.unlock();
}
}
s.writeObject(null);
s.writeObject(null);
}
//反序列化方法
private void readObject(java.io.ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
for (int i = 0; i < segments.length; ++i) {
segments[i].setTable(new HashEntry[1]);
}
for (;;) {
K key = (K) s.readObject();
V value = (V) s.readObject();
if (key == null)
break;
put(key, value);
}
}
}