集合類不安全
集合類不安全
List不安全
多執行緒操作List可能會出現:java.util.ConcurrentModificationException
異常
// java.util.ConcurrentModificationException 併發修改異常! public class ListTest { public static void main(String[] args) { // 併發下 ArrayList 不安全,Synchronized; /** * 解決方案; * 1、List<String> list = new Vector<>(); * 2、List<String> list = Collections.synchronizedList(new ArrayList<>()); * 3、List<String> list = new CopyOnWriteArrayList<>(); */ // CopyOnWrite 寫入時複製 COW 計算機程式設計領域的一種優化策略; // 多個執行緒呼叫的時候,list,讀取的時候,固定的,寫入(覆蓋) // 在寫入的時候避免覆蓋,造成資料問題! // 讀寫分離 // CopyOnWriteArrayList 比 Vector Nb 在哪裡? List<String> list = new CopyOnWriteArrayList<>(); for (int i = 1; i <= 10; i++) { new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,5)); System.out.pjavarintln(list); },String.valueOf(i)).start(); } } }
CopyOnWriteArrayList的add方法
public boolean add(E e) { final ReentrantLock lock = this.lock; //使用lock鎖 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //在寫入前先複製List Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
Set不安全
/** * 同理可證 : ConcurrentModificationException */ public class SetTest { public static void main(String[] args) { Set<String> set = new HashSet<>(); // hashmap // Set<String> set = Collections.synchronizedSet(new HashSet<>()); // Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 1; i <=30 ; i++) { new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(set); },String.valueOf(i)).start(); } } }
hashSet 底層是什麼?
public HashSet() {
map = new HashMap<>();
}
.....
public boolean add(E e) {
//PRESENT= new Object()
return map.put(e, PRESENT)==null;
}
Map不安全
// ConcurrentModificationException
public class MapTest {
public static void main(String[] args) {
// map 是這樣用的嗎? 不是,工作中不用 HashMap
// 預設等價於什麼? new HashMap<>(16,0.75);
// Map<String, String> map = new HashMap<>();
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 1; i <=30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
ConcurrentHashMap原理
-
鎖分段技術
- HashTable容器在競爭激烈的併發環境下表現出效率低下的原因,是因為所有訪問HashTable的執行緒都必須競爭同一把鎖,那假如容器裡有多把鎖,每一把鎖用於鎖容器其中一部分資料,那麼當多執行緒訪問容器裡不同資料段的資料時,執行緒間就不會存在鎖競爭,從而可以有效的提高併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。
- 首先將資料分成一段一段的儲存,然後給每一段資料配一把鎖,當一個執行緒佔用鎖訪問其中一個段資料的時候,其段的資料也能被其他執行緒訪問。
- 有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢後,又按順序釋放所有段的鎖。
- 這裡“按順序”是很重要的,否則極有可能出現死鎖,在ConcurrentHashMap內部,段陣列是final的,並且其成員變數實際上也是final的,
- 但是,僅僅是將陣列宣告為final的並不保證陣列成員也是final的,這需要實現上的保證。這可以確保不會出現死鎖,因為獲得鎖的順序是固定的。
- oncurrentHashMap 類中包含兩個靜態內部類 HashEntry 和 Segment。
- HashEntry 用來封裝對映表的鍵 / 值對;Segment 用來充當鎖的角色,每個 Segment 物件守護整個雜湊對映表的若干個桶。
- 每個桶是由若干個 HashEntry 物件連結起來的連結串列。一個 ConcurrentHashMap 例項中包含由若干個 Segment 物件組成的陣列。
- 每個Segment守護者一個HashEntry數組裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得它對應的Segment鎖。
-
HashEntry類
static final class HashEntry<K,V> { final K key; // 宣告 key 為 final 型 final int hash; // 宣告 hash 值為 final 型 volatile V value; // 宣告 value 為 volatile 型 final HashEntry<K,V> next; // 宣告 next 為 final 型 HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } }
- 每個HashEntry代表Hash表中的一個節點,在其定義的結構中可以看到,除了value值沒有定義final,其餘的都定義為final型別,我們知道Java中關鍵詞final修飾的域成為最終域。
- 用關鍵詞final修飾的變數一旦賦值,就不能改變,也稱為修飾的標識為常量。這就意味著我們刪除或者增加一個節點的時候,就必須從頭開始重新建立Hash鏈,因為next引用值需要改變。
- 由於 HashEntry 的 next 域為 final 型,所以新節點只能在連結串列的表頭處插入。 例如將A,B,C插入空桶中,插入後的結構為:
-
segment類
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; /** * 在本 segment 範圍內,包含的 HashEntry 元素的個數 * 該變數被宣告為 volatile 型,保證每次讀取到最新的資料 */ transient volatile int count; /** *table 被更新的次數 */ transient int modCount; /** * 當 table 中包含的 HashEntry 元素的個數超過本變數值時,觸發 table 的再雜湊 */ transient int threshold; /** * table 是由 HashEntry 物件組成的陣列 * 如果雜湊時發生碰撞,碰撞的 HashEntry 物件就以連結串列的形式連結成一個連結串列 * table 陣列的陣列成員代表雜湊對映表的一個桶 * 每個 table 守護整個 ConcurrentHashMap 包含桶總數的一部分 * 如果併發級別為 16,table 則守護 ConcurrentHashMap 包含的桶總數的 1/16 */ transient volatile HashEntry<K,V>[] table; /** * 裝載因子 */ final float loadFactor; }
- Segment 類繼承於 ReentrantLock 類,從而使得 Segment 物件能充當鎖的角色。每個 Segment 物件用來守護其(成員物件 table 中)包含的若干個桶。
- table 是一個由 HashEntry 物件組成的陣列。table 陣列的每一個數組成員就是雜湊對映表的一個桶。
- 每一個 Segment 物件都有一個 count 物件來表示本 Segment 中包含的 HashEntry 物件的總數。
- 之所以在每個 Segment 物件中包含一個計數器,而不是在 ConcurrentHashMap 中使用全域性的計數器,是為了避免出現“熱點域”而影響 ConcurrentHashMap 的併發性。
-
ConcurrentHashMap 類
- 預設的情況下,每個ConcurrentHashMap 類會建立16個併發的segment,每個segment裡面包含多個Hash表,每個Hash鏈都是有HashEntry節點組成的。
- 如果鍵能均勻雜湊,每個 Segment 大約守護整個散列表中桶總數的 1/16。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
/**
* 雜湊對映表的預設初始容量為 16,即初始預設為 16 個桶
* 在建構函式中沒有指定這個引數時,使用本引數
*/
static final int DEFAULT_INITIAL_CAPACITY= 16;
/**
* 雜湊對映表的預設裝載因子為 0.75,該值是 table 中包含的 HashEntry 元素的個數與
* table 陣列長度的比值
* 當 table 中包含的 HashEntry 元素的個數超過了 table 陣列的長度與裝載因子的乘積時,
* 將觸發 再雜湊
* 在建構函式中沒有指定這個引數時,使用本引數
*/
static final float DEFAULT_LOAD_FACTOR= 0.75f;
/**
* 散列表的預設併發級別為 16。該值表示當前更新執行緒的估計數
* 在建構函式中沒有指定這個引數時,使用本引數
*/
static final int DEFAULT_CONCURRENCY_LEVEL= 16;
/**
* segments 的掩碼值
* key 的雜湊碼的高位用來選擇具體的 segment
*/
final int segmentMask;
/**
* 偏移量
*/
final int segmentShift;
/**
* 由 Segment 物件組成的陣列
*/
final Segment<K,V>[] segments;
/**
* 建立一個帶有指定初始容量、載入因子和併發級別的新的空對映。
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if(!(loadFactor > 0) || initialCapacity < 0 ||
concurrencyLevel <= 0)
throw new IllegalArgumentException();
if(concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 尋找最佳匹配引數(不小於給定引數的最接近的 2 次冪)
int sshift = 0;
int ssize = 1;
while(ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift; // 偏移量值
segmentMask = ssize - 1; // 掩碼值
this.segments = Segment.newArray(ssize); // 建立陣列
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if(c * ssize < initialCapacity)
++c;
int cap = 1;
while(cap < c)
cap <<= 1;
// 依次遍歷每個陣列元素
for(int i = 0; i < this.segments.length; ++i)
// 初始化每個陣列元素引用的 Segment 物件
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
/**
* 建立一個帶有預設初始容量 (16)、預設載入因子 (0.75) 和 預設併發級別 (16)
* 的空雜湊對映表。
*/
public ConcurrentHashMap() {
// 使用三個預設引數,呼叫上面過載的建構函式來建立空雜湊對映表
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
-
用分離鎖實現多個執行緒間的併發寫操作
(1)Put方法的實現
首先,根據 key 計算出對應的 hash 值:
public V put(K key, V value) {
if (value == null) //ConcurrentHashMap 中不允許用 null 作為對映值
throw new NullPointerException();
int hash = hash(key.hashCode()); // 計算鍵對應的雜湊碼
// 根據雜湊碼找到對應的 Segment
return segmentFor(hash).put(key, hash, value, false);
}
根據 hash 值找到對應的 Segment:
/**
* 使用 key 的雜湊碼來得到 segments 陣列中對應的 Segment
*/
final Segment<K,V> segmentFor(int hash) {
// 將雜湊值右移 segmentShift 個位,並在高位填充 0
// 然後把得到的值與 segmentMask 相“與”
// 從而得到 hash 值對應的 segments 陣列的下標值
// 最後根據下標值返回雜湊碼對應的 Segment 物件
return segments[(hash >>> segmentShift) & segmentMask];
}
在這個 Segment 中執行具體的 put 操作:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); // 加鎖,這裡是鎖定某個 Segment 物件而非整個 ConcurrentHashMap
try {
int c = count;
if (c++ > threshold) // 如果超過再雜湊的閾值
rehash(); // 執行再雜湊,table 陣列的長度將擴充一倍
HashEntry<K,V>[] tab = table;
// 把雜湊碼值與 table 陣列的長度減 1 的值相“與”
// 得到該雜湊碼對應的 table 陣列的下標值
int index = hash & (tab.length - 1);
// 找到雜湊碼對應的具體的那個桶
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { // 如果鍵 / 值對以經存在
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value; // 設定 value 值
}
else { // 鍵 / 值對不存在
oldValue = null;
++modCount; // 要新增新節點到連結串列中,所以 modCont 要加 1
// 建立新節點,並新增到連結串列的頭部
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // 寫 count 變數
}
return oldValue;
} finally {
unlock(); // 解鎖
}
}
這裡的加鎖操作是針對(鍵的 hash 值對應的)某個具體的 Segment,鎖定的是該 Segment 而不是整個 ConcurrentHashMap。
因為插入鍵 / 值對操作只是在這個 Segment 包含的某個桶中完成,不需要鎖定整個ConcurrentHashMap。
此時,其他寫執行緒對另外 15 個Segment 的加鎖並不會因為當前執行緒對這個 Segment 的加鎖而阻塞。
同時,所有讀執行緒幾乎不會因本執行緒的加鎖而阻塞(除非讀執行緒剛好讀到這個 Segment 中某個 HashEntry 的 value 域的值為 null,此時需要加鎖後重新讀取該值)。
(2)Get方法的實現
V get(Object key, int hash) {
if(count != 0) { // 首先讀 count 變數
HashEntry<K,V> e = getFirst(hash);
while(e != null) {
if(e.hash == hash && key.equals(e.key)) {
V v = e.value;
if(v != null)
return v;
// 如果讀到 value 域為 null,說明發生了重排序,加鎖後重新讀取
return readValueUnderLock(e);
}
e = e.next;
}
}
return null;
}
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
ConcurrentHashMap中的讀方法不需要加鎖,所有的修改操作在進行結構修改時都會在最後一步寫count 變數,通過這種機制保證get操作能夠得到幾乎最新的結構更新。
(3)Remove方法的實現
V remove(Object key, int hash, Object value) {
lock(); //加鎖
try{
int c = count - 1;
HashEntry<K,V>[] tab = table;
//根據雜湊碼找到 table 的下標值
int index = hash & (tab.length - 1);
//找到雜湊碼對應的那個桶
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while(e != null&& (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if(e != null) {
V v = e.value;
if(value == null|| value.equals(v)) { //找到要刪除的節點
oldValue = v;
++modCount;
//所有處於待刪除節點之後的節點原樣保留在連結串列中
//所有處於待刪除節點之前的節點被克隆到新連結串列中
HashEntry<K,V> newFirst = e.next;// 待刪節點的後繼結點
for(HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
//把桶連結到新的頭結點
//新的頭結點是原連結串列中,刪除節點之前的那個節點
tab[index] = newFirst;
count = c; //寫 count 變數
}
}
return oldValue;
} finally{
unlock(); //解鎖
}
}
整個操作是在持有段鎖的情況下執行的,空白行之前的行主要是定位到要刪除的節點e。
如果不存在這個節點就直接返回null,否則就要將e前面的結點複製一遍,尾結點指向e的下一個結點。
e後面的結點不需要複製,它們可以重用。
中間那個for迴圈是做什麼用的呢?從程式碼來看,就是將定位之後的所有entry克隆並拼回前面去,但有必要嗎?
每次刪除一個元素就要將那之前的元素克隆一遍?這點其實是由entry的不變性來決定的,仔細觀察entry定義,發現除了value,其他所有屬性都是用final來修飾的,
這意味著在第一次設定了next域之後便不能再改變它,取而代之的是將它之前的節點全都克隆一次。至於entry為什麼要設定為不變性,這跟不變性的訪問不需要同步從而節省時間有關。
(4)containsKey方法的實現,它不需要讀取值。
我們要統計整個ConcurrentHashMap裡元素的大小,就必須統計所有Segment裡元素的大小後求和。
Segment裡的全域性變數count是一個volatile變數,那麼在多執行緒場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?
不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之後可能累加前使用的count發生了變化,那麼統計結果就不準了。
所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。
因為在累加count操作過程中,之前累加過的count發生變化的機率非常小,所以ConcurrentHashMap的做法是先嚐試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再採用加鎖的方式來統計所有Segment的大小。
那麼ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變數,在put , remove和clean方法裡操作元素前都會將變數modCount進行加1,那麼在統計size前後比較modCount是否發生變化,從而得知容器的大小是否發生變化。
-
總結
1.在使用鎖來協調多執行緒間併發訪問的模式下,減小對鎖的競爭可以有效提高併發性。
有兩種方式可以減小對鎖的競爭:
減小請求同一個鎖的頻率。
減少持有鎖的時間。
2.ConcurrentHashMap 的高併發性主要來自於三個方面:
用分離鎖實現多個執行緒間的更深層次的共享訪問。
用 HashEntery 物件的不變性來降低執行讀操作的執行緒在遍歷連結串列期間對加鎖的需求。
通過對同一個 Volatile 變數的寫 / 讀訪問,協調不同執行緒間讀 / 寫操作的記憶體可見性。
使用分離鎖,減小了請求同一個鎖的頻率。
視訊參考https://www.bilibili.com/video/BV1B7411L7tE
上一篇:8鎖問題
下一篇:Callable