1. 程式人生 > 實用技巧 >ConcurrentHashMap原理分析(三)-計數

ConcurrentHashMap原理分析(三)-計數

概述

  由於ConcurrentHashMap是一個高併發的集合,集合中增刪就比較頻繁,那計數就變成了一個問題,如果使用向AtomicInteger這樣型別的變數來計數,雖然可以保證原子性,但是太多執行緒去競爭CAS,自旋也挺浪費時間的,所以ConcurrentHashMap使用了一種類似LongAddr的資料結構去計數,其實LongAddr是繼承Striped64,有關於這個類的原理大家可以參考這篇文章:併發之STRIPED64(累加器)和 LONGADDER,大家瞭解了這個類的原理,理解ConcurrentHashMap計數就沒有一點壓力了,因為兩者在程式碼實現上基本一樣。

ConcurrentHashMap計數原理

    private transient volatile long baseCount;

    private transient volatile CounterCell[] counterCells;

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

ConcurrentHashMap就是依託上面三個東東進行計數的,那下面就詳細解釋一下這三個東東。

  • baseCount:最基礎的計數,比如只有一個執行緒put操作,只需要通過CAS修改baseCount就可以了。
  • counterCells:這是一個數組,裡面放著CounterCell物件,這個類裡面就一個屬性,其使用方法是,在高併發的時候,多個執行緒都要進行計數,每個執行緒有一個探針hash值,通過這個hash值定位到陣列桶的位置,如果這個位置有值就通過CAS修改CounterCell的value(如果修改失敗,就換一個再試),如果沒有,就建立一個CounterCell物件。
  • 最後通過把桶中的所有物件的value值和baseCount求和得到總值,程式碼如下。
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        
//baseCount作為基礎值 long sum = baseCount; if (as != null) { //遍歷陣列 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) //對每個value累加 sum += a.value; } } return sum; }

通過上面的分析,相信大家已經瞭解了高併發計數的方法,在上面的介紹中提到一點,每個執行緒的探針hash值,大家先有個印象,一會分析程式碼的時候會使用這個,其實這個值很有趣。

addCount()方法

又到了這個方法,在上篇文章中分析擴容的時候也分析過這個方法,不過分析的是一部分,現在分析另一部分。

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //如果陣列還沒有建立,就直接對baseCount進行CAS操作
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            //設定沒有衝突標誌位為true
            boolean uncontended = true;
            //第一個條件as == null成立說明,counterCells陣列沒有建立,而且通過CAS修改baseCount失敗,說明有別的執行緒競爭CAS
            //a = as[ThreadLocalRandom.getProbe() & m]) == null,說明陣列是建立了,但是通過探針hash定位的桶中沒有物件
            //如果有物件,執行最後一個,進行CAS修改CounterCell物件,如果也失敗了,就要進入下面的方法
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //進行更精確的處理
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //省略部分程式碼
}

上面整體過程還是很好理解的,就是我在上面介紹計數原理中說的步驟,但是上面有一個地方需要注意下就是:

a = as[ThreadLocalRandom.getProbe() & m]) == null 

這裡的ThreadLocalRandom.getProbe(),看著名字大家應該可以猜到應該是要獲取一個隨機值,因為有Random嘛,其實這裡就是在獲取一個隨機值。那既然是要獲取一個隨機值,為什麼要使用ThreadLocalRandom,而不直接使用Random,那看到ThreadLocal,大家也應該可以想到就是這個隨機值的獲取執行緒之間是隔離的,每個執行緒獲取自己的隨機值,互相之間沒有影響,為什麼要互相之間沒有影響呢?因為Random要實現隨機,有一個關鍵的東西就是種子(seed),具體過程如下:

  • 初始化Random的時候,如果沒有傳seed,就根據時間戳進行一些計算初始化一個種子
  • 如果某個執行緒需要獲取隨機數,先通過CAS更新種子seed1 = function1(seed)
  • 根據seed1,計算隨機數 = function2(seed1)
  • 上面的兩個function是固定的,就是說如果初始種子一樣,兩個不同的Random物件生成隨機數會完全一樣

上面的過程咋一看沒啥問題,其實最大問題就是第二步那個CAS,在高併發的時候效率會很差,所以這裡才使用了ThreadLocalRandom,相當於每個執行緒都有一個Random,都有自己的種子,這樣就不會存在多執行緒競爭修改種子。想要詳細瞭解ThreadLocalRandom,參考:併發包中ThreadLocalRandom類原理剖析

fullAddCount()方法

其實這個方法沒什麼好分析的,其實就是Striped64#longAccumulate()方法,據我的觀察好像一行不差,完全一樣,這裡還是分析下吧。

  1 private final void fullAddCount(long x, boolean wasUncontended) {
  2         int h;
  3          //上面我貼出來了介紹ThreadLocalRandom的文章,這裡如果是首次獲取,其實就是0 
  4          if ((h = ThreadLocalRandom.getProbe()) == 0) {
  5             //如果為0,就初始化,這裡其實就是把種子和隨機數設定到(Thread)執行緒中
  6             ThreadLocalRandom.localInit();      // force initialization
  7             h = ThreadLocalRandom.getProbe();
  8             wasUncontended = true;
  9         }
 10         boolean collide = false;                // True if last slot nonempty
 11      
 12         //死迴圈,保證計數一定成功
 13         for (;;) {
 14             CounterCell[] as; CounterCell a; int n; long v;
 15             //說明陣列已經初始化,在後面有判斷陣列沒有初始化的情況
 16             if ((as = counterCells) != null && (n = as.length) > 0) {
 17                 //這裡是不是和ConcurrentHashMap定位桶的位置很像,其實是一摸一樣的
 18                //說明陣列中這個位置沒有元素 
 19                if ((a = as[(n - 1) & h]) == null) {
 20                     //這個欄位保證陣列新增節點,擴容只有一個執行緒在進行,防止多執行緒併發
 21                     //這裡限制一個執行緒處理只是在陣列新增節點和擴容的時候,修改物件的值並不需要限制這個變數
 22                     if (cellsBusy == 0) {            // Try to attach new Cell
 23                         CounterCell r = new CounterCell(x); // Optimistic create
 24 
 25                         //如果為0表示沒有別的執行緒在修改陣列,通過CAS修改為1,表示當前執行緒在修改陣列
 26                         if (cellsBusy == 0 &&
 27                             U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 28                             boolean created = false;
 29                             try {               // Recheck under lock
 30                                 CounterCell[] rs; int m, j;
 31                                 //再次校驗,確保陣列沒有變化
 32                                 //rs[j = (m - 1) & h] == null,再次確認該位置是否為null,防止別的執行緒插入了
 33                                 if ((rs = counterCells) != null &&
 34                                     (m = rs.length) > 0 &&
 35                                     rs[j = (m - 1) & h] == null) {
 36                                     //插入陣列
 37                                     rs[j] = r;
 38                                     created = true;
 39                                 }
 40                             } finally {
 41                                //釋放CAS鎖
 42                                 cellsBusy = 0;
 43                             }
 44                             if (created)
 45                                 //如果新節點插入成功,表示計數已經成功,這裡直接break了
 46                                 break;
 47                            //如果失敗會一直重試
 48                             continue;           // Slot is now non-empty
 49                         }
 50                     }
 51                     collide = false;
 52                 }
 53                 else if (!wasUncontended)       // CAS already known to fail
 54                     wasUncontended = true;      // Continue after rehash
 55               
 56                 //定位到桶中有值,然後通過CAS修改其值
 57                 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
 58                     break;
 59                //下面的兩個elseif其實是為了防止陣列一直擴容使用的,陣列的最大容量就是CPU的核數
 60                //因為核數就是併發數,陣列太大沒有意義,沒有那麼多執行緒可以同時操作
 61                //就是說上面的新建節點或者CAS修改值事變了,就會到這裡,然後攔截住,不讓執行擴容
 62                 else if (counterCells != as || n >= NCPU)
 63                     collide = false;            // At max size or stale
 64                 else if (!collide)
 65                     collide = true;
 66                 //先競爭到CAS鎖,然後執行擴容
 67                 else if (cellsBusy == 0 &&
 68                          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 69                     try {
 70                         if (counterCells == as) {// Expand table unless stale
 71                            
 72                             //每次擴容成原來的兩倍
 73                             CounterCell[] rs = new CounterCell[n << 1];
 74                             //複製元素,看過ConcurrentHashMap的擴容,再看這個,簡直就跟一個大學生看小學數學題一樣,