ConcurrentHashMap原理分析(三)-計數
概述
由於ConcurrentHashMap是一個高併發的集合,集合中增刪就比較頻繁,那計數就變成了一個問題,如果使用向AtomicInteger這樣型別的變數來計數,雖然可以保證原子性,但是太多執行緒去競爭CAS,自旋也挺浪費時間的,所以ConcurrentHashMap使用了一種類似LongAddr的資料結構去計數,其實LongAddr是繼承Striped64,有關於這個類的原理大家可以參考這篇文章:併發之STRIPED64(累加器)和 LONGADDER,大家瞭解了這個類的原理,理解ConcurrentHashMap計數就沒有一點壓力了,因為兩者在程式碼實現上基本一樣。
ConcurrentHashMap計數原理
private transient volatile long baseCount; private transient volatile CounterCell[] counterCells; @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
ConcurrentHashMap就是依託上面三個東東進行計數的,那下面就詳細解釋一下這三個東東。
- baseCount:最基礎的計數,比如只有一個執行緒put操作,只需要通過CAS修改baseCount就可以了。
- counterCells:這是一個數組,裡面放著CounterCell物件,這個類裡面就一個屬性,其使用方法是,在高併發的時候,多個執行緒都要進行計數,每個執行緒有一個探針hash值,通過這個hash值定位到陣列桶的位置,如果這個位置有值就通過CAS修改CounterCell的value(如果修改失敗,就換一個再試),如果沒有,就建立一個CounterCell物件。
- 最後通過把桶中的所有物件的value值和baseCount求和得到總值,程式碼如下。
final long sumCount() { CounterCell[] as = counterCells; CounterCell a;//baseCount作為基礎值 long sum = baseCount; if (as != null) { //遍歷陣列 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) //對每個value累加 sum += a.value; } } return sum; }
通過上面的分析,相信大家已經瞭解了高併發計數的方法,在上面的介紹中提到一點,每個執行緒的探針hash值,大家先有個印象,一會分析程式碼的時候會使用這個,其實這個值很有趣。
addCount()方法
又到了這個方法,在上篇文章中分析擴容的時候也分析過這個方法,不過分析的是一部分,現在分析另一部分。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //如果陣列還沒有建立,就直接對baseCount進行CAS操作 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; //設定沒有衝突標誌位為true boolean uncontended = true; //第一個條件as == null成立說明,counterCells陣列沒有建立,而且通過CAS修改baseCount失敗,說明有別的執行緒競爭CAS //a = as[ThreadLocalRandom.getProbe() & m]) == null,說明陣列是建立了,但是通過探針hash定位的桶中沒有物件 //如果有物件,執行最後一個,進行CAS修改CounterCell物件,如果也失敗了,就要進入下面的方法 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //進行更精確的處理 fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //省略部分程式碼 }
上面整體過程還是很好理解的,就是我在上面介紹計數原理中說的步驟,但是上面有一個地方需要注意下就是:
a = as[ThreadLocalRandom.getProbe() & m]) == null
這裡的ThreadLocalRandom.getProbe(),看著名字大家應該可以猜到應該是要獲取一個隨機值,因為有Random嘛,其實這裡就是在獲取一個隨機值。那既然是要獲取一個隨機值,為什麼要使用ThreadLocalRandom,而不直接使用Random,那看到ThreadLocal,大家也應該可以想到就是這個隨機值的獲取執行緒之間是隔離的,每個執行緒獲取自己的隨機值,互相之間沒有影響,為什麼要互相之間沒有影響呢?因為Random要實現隨機,有一個關鍵的東西就是種子(seed),具體過程如下:
- 初始化Random的時候,如果沒有傳seed,就根據時間戳進行一些計算初始化一個種子
- 如果某個執行緒需要獲取隨機數,先通過CAS更新種子seed1 = function1(seed)
- 根據seed1,計算隨機數 = function2(seed1)
- 上面的兩個function是固定的,就是說如果初始種子一樣,兩個不同的Random物件生成隨機數會完全一樣
上面的過程咋一看沒啥問題,其實最大問題就是第二步那個CAS,在高併發的時候效率會很差,所以這裡才使用了ThreadLocalRandom,相當於每個執行緒都有一個Random,都有自己的種子,這樣就不會存在多執行緒競爭修改種子。想要詳細瞭解ThreadLocalRandom,參考:併發包中ThreadLocalRandom類原理剖析
fullAddCount()方法
其實這個方法沒什麼好分析的,其實就是Striped64#longAccumulate()方法,據我的觀察好像一行不差,完全一樣,這裡還是分析下吧。
1 private final void fullAddCount(long x, boolean wasUncontended) { 2 int h; 3 //上面我貼出來了介紹ThreadLocalRandom的文章,這裡如果是首次獲取,其實就是0 4 if ((h = ThreadLocalRandom.getProbe()) == 0) { 5 //如果為0,就初始化,這裡其實就是把種子和隨機數設定到(Thread)執行緒中 6 ThreadLocalRandom.localInit(); // force initialization 7 h = ThreadLocalRandom.getProbe(); 8 wasUncontended = true; 9 } 10 boolean collide = false; // True if last slot nonempty 11 12 //死迴圈,保證計數一定成功 13 for (;;) { 14 CounterCell[] as; CounterCell a; int n; long v; 15 //說明陣列已經初始化,在後面有判斷陣列沒有初始化的情況 16 if ((as = counterCells) != null && (n = as.length) > 0) { 17 //這裡是不是和ConcurrentHashMap定位桶的位置很像,其實是一摸一樣的 18 //說明陣列中這個位置沒有元素 19 if ((a = as[(n - 1) & h]) == null) { 20 //這個欄位保證陣列新增節點,擴容只有一個執行緒在進行,防止多執行緒併發 21 //這裡限制一個執行緒處理只是在陣列新增節點和擴容的時候,修改物件的值並不需要限制這個變數 22 if (cellsBusy == 0) { // Try to attach new Cell 23 CounterCell r = new CounterCell(x); // Optimistic create 24 25 //如果為0表示沒有別的執行緒在修改陣列,通過CAS修改為1,表示當前執行緒在修改陣列 26 if (cellsBusy == 0 && 27 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { 28 boolean created = false; 29 try { // Recheck under lock 30 CounterCell[] rs; int m, j; 31 //再次校驗,確保陣列沒有變化 32 //rs[j = (m - 1) & h] == null,再次確認該位置是否為null,防止別的執行緒插入了 33 if ((rs = counterCells) != null && 34 (m = rs.length) > 0 && 35 rs[j = (m - 1) & h] == null) { 36 //插入陣列 37 rs[j] = r; 38 created = true; 39 } 40 } finally { 41 //釋放CAS鎖 42 cellsBusy = 0; 43 } 44 if (created) 45 //如果新節點插入成功,表示計數已經成功,這裡直接break了 46 break; 47 //如果失敗會一直重試 48 continue; // Slot is now non-empty 49 } 50 } 51 collide = false; 52 } 53 else if (!wasUncontended) // CAS already known to fail 54 wasUncontended = true; // Continue after rehash 55 56 //定位到桶中有值,然後通過CAS修改其值 57 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) 58 break; 59 //下面的兩個elseif其實是為了防止陣列一直擴容使用的,陣列的最大容量就是CPU的核數 60 //因為核數就是併發數,陣列太大沒有意義,沒有那麼多執行緒可以同時操作 61 //就是說上面的新建節點或者CAS修改值事變了,就會到這裡,然後攔截住,不讓執行擴容 62 else if (counterCells != as || n >= NCPU) 63 collide = false; // At max size or stale 64 else if (!collide) 65 collide = true; 66 //先競爭到CAS鎖,然後執行擴容 67 else if (cellsBusy == 0 && 68 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { 69 try { 70 if (counterCells == as) {// Expand table unless stale 71 72 //每次擴容成原來的兩倍 73 CounterCell[] rs = new CounterCell[n << 1]; 74 //複製元素,看過ConcurrentHashMap的擴容,再看這個,簡直就跟一個大學生看小學數學題一樣,