ConcurrentHashMap 源碼淺析 1.7
(1) 背景
HashMap死循環:HashMap在並發執行put操作時會引起死循環,是因為多線程會導致HashMap的Entry鏈表形成環形數據結構,一旦形成環形數據結構,Entry的next節點永遠不為空,就會產生死循環獲取Entry.
HashTable效率低下:HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下.因為當一個線程訪問HashTable的同步方法,其它線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態.如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法獲取元素,所以競爭越激烈效率越低.(2) 簡介
HashTable容器在競爭激烈的並發環境下表現出效率低下的原因是所有訪問HashTable的線程都必須競爭一把鎖,假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麽多線程訪問容器裏不同的數據段時,線程間不會存在競爭,從而可以有效提高並發訪問效率,這就是ConcurrentHash所使用的鎖分段技術.首先將數據分成一段一段地儲存,然後給每一段配一把鎖,當一個線程占用鎖訪問其中一段數據時,其它段的數據也能被其它線程訪問.
結構
ConcurrentHash是由Segments數組結構和HashEntry數組結構組成.Segment是一種可重入鎖(ReentrantLock),在ConcurrentHashMap裏扮演鎖的色;HashEntry則用於存儲鍵值對數據.一個ConcurrentHashMap裏包含一個Segment組.Segment的結構和HashMap類似,是一種數組加鏈表的結構.一個Segment裏包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護者一個HashEntry數組裏面的元素,當對HashEntry數組的數據進行修改時,必須先獲得與它對應的Segment鎖,如下圖所示.
基本成員
default_initial_capacitymap默認容量,必須是2的冥
/**
* 默認的初始容量 16
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
default_load_factor默認負載因子(存儲的比例)
/**
* 默認的負載因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
default_concurrency_level默認並發數量,segments數組量
/** * 默認的並發數量,會影響segments數組的長度 */ static final int DEFAULT_CONCURRENCY_LEVEL = 16;
maximum_capacitymap最大容量
/**
* 最大容量,構造ConcurrentHashMap時指定的值超過,就用該值替換
* ConcurrentHashMap大小必須是2^n,且小於等於2^30
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
min_segment_table_capacityHashEntry[]默認容量
/**
* 每個segment中table數組的長度,必須是2^n,至少為2
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
max_segments最大並發數,segments數組最大量
/**
* 允許最大segment數量,用於限定concurrencyLevel的邊界,必須是2^n
*/
static final int MAX_SEGMENTS = 1 << 16;
retries_before_lock重試次數,在加鎖之前
/**
* 非鎖定情況下調用size和contains方法的重試次數,避免由於table連續被修改導致無限重試
*/
static final int RETRIES_BEFORE_LOCK = 2;
segmentMask計算segment位置的掩碼(segments.length-1)
/**
* 用於segment的掩碼值,用於與hash的高位進行取&
*/
final int segmentMask;
segmentShift
/**
* 用於算segment位置時,hash參與運算的位數
*/
final int segmentShift;
segmentssegment數組
/**
* segments數組
*/
final Segment<K,V>[] segments;
HashEntry存儲數據的鏈式結構
static final class HashEntry<K,V> {
// hash值
final int hash;
// key
final K key;
// 保證內存可見性,每次從內存中獲取
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* 使用volatile語義寫入next,保證可見性
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
Segment繼承ReentrantLock鎖,用於存放HashEntry[]
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 對segment加鎖時,在阻塞之前自旋的次數
*
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* 每個segment的HashEntry table數組,訪問數組元素可以通過entryAt/setEntryAt提供的volatile語義來完成
* volatile保證可見性
*/
transient volatile HashEntry<K,V>[] table;
/**
* 元素的數量,只能在鎖中或者其他保證volatile可見性之間進行訪問
*/
transient int count;
/**
* 當前segment中可變操作發生的次數,put,remove等,可能會溢出32位
* 它為chm isEmpty() 和size()方法中的穩定性檢查提供了足夠的準確性.
* 只能在鎖中或其他volatile讀保證可見性之間進行訪問
*/
transient int modCount;
/**
* 當table大小超過閾值時,對table進行擴容,值為(int)(capacity *loadFactor)
*/
transient int threshold;
/**
* 負載因子
*/
final float loadFactor;
/**
* 構造方法
*/
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
有參構造
/**
* ConcurrentHashMap 構造方法
* @param initialCapacity 初始化容量
* @param loadFactor 負載因子
* @param concurrencyLevel 並發segment,segments數組的長度
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 大於最大segments容量,取最大容量
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 2^sshift = ssize 例如:sshift = 4,ssize = 16
// 根據concurrencyLevel計算出ssize為segments數組的長度
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) { // 第一次 滿足
++sshift; // 第一次 1
ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1)
}
// segmentShift和segmentMask的定義
this.segmentShift = 32 - sshift; // 用於計算hash參與運算位數
this.segmentMask = ssize - 1; // segments位置範圍
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 計算每個segment中table的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// HashEntry[]默認 容量
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 確保cap是2^n
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 創建segments並初始化第一個segment數組,其余的segment延遲初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
無參構造使用默認參數
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
一些UNSAFE方法
HashEntry
setNext
/**
* 使用volatile語義寫入next,保證可見性
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
entryAt get HashEntry
/**
* 獲取給定table的第i個元素,使用volatile讀語義
*/
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
setEntryAt set HashEntry
/**
* 設置給定的table的第i個元素,使用volatile寫語義
*/
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
put 插入元素
ConcurrentHashMap 源碼淺析 1.7