1. 程式人生 > 實用技巧 >ConcurrentHashMap原理分析(一)-綜述

ConcurrentHashMap原理分析(一)-綜述

概述

  ConcurrentHashMap,一個執行緒安全的高效能集合,儲存結構和HashMap一樣,都是採用陣列進行分桶,之後再每個桶中掛一個連結串列,當連結串列長度大於8的時候轉為紅黑樹,其實現執行緒安全的基本原理是採用CAS + synchronized組合,當陣列的桶中沒有元素時採用CAS插入,相反,則採用synchronized加鎖插入,除此之外在擴容和記錄size方面也做了很多的優化,擴容允許多個執行緒共同協助擴容,而記錄size的方式則採用類似LongAddr的方式,提高併發性,本片文章是介紹ConcurrentHashMap的第一篇,主要介紹下其結構,put()、get()方法,後面幾篇文章會介紹其他方法。

ConcurrentHashMap儲存結構

從上圖可以清晰的看到其儲存結構是採用陣列 + 連結串列 + 紅黑樹的結構,下面就介紹一下每一種儲存結構在程式碼中的表現形式。

陣列

    transient volatile Node<K,V>[] table;
    private transient volatile Node<K,V>[] nextTable;

可以看到陣列中存的是Node,Node就是構成連結串列的節點。第二個nextTable是擴容之後的陣列,在擴容的時候會使用。

連結串列

static class Node<K,V> implements
Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; }
//省略部分程式碼 }

一個典型的單鏈表儲存結構,裡面儲存著key,val,以及這個key對應的hash值,next表示指向下一個Node。

紅黑樹

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
//省略部分程式碼
}

TreeNode是構成紅黑樹的節點,其繼承了Node節點,用於儲存key,val,hash等值。但是在陣列中並不直接儲存TreeNode,一開始在沒看原始碼之前,我以為陣列中儲存的是紅黑樹的根節點,其實不是,是下面這個東東。

static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock
//省略部分程式碼
)

這個類封裝了TreeNode,而且提供了連結串列轉紅黑樹,以及紅黑樹的增刪改查方法。

其他節點

 static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
//省略部分程式碼
}

這個節點正常情況下在ConcurrentHashMap中是不存在的,只有當擴容的時候才會存在,該節點中有一個nextTable欄位,用於指向擴容之後的陣列,其使用方法是這樣的,擴容的時候需要把舊陣列的資料拷貝到新陣列,當某個桶中的資料被拷貝完成之後,就把舊陣列的該桶標記為ForwardingNode,當別的執行緒訪問到這個桶,發現被標記為ForwardingNode就知道該桶已經被copy到了新陣列,之後就可以根據這個做相應的處理。

ConcurrentHashMap關鍵屬性分析

這些屬性先有個印象,都會在之後的原始碼中使用,不用現在就搞明白。

    //最大容量
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    //預設初始化容量
    private static final int DEFAULT_CAPACITY = 16;
    //負載因子
    private static final float LOAD_FACTOR = 0.75f;
    //連結串列轉為紅黑樹的臨界值
    static final int TREEIFY_THRESHOLD = 8;
    //紅黑樹轉為連結串列的臨界值
    static final int UNTREEIFY_THRESHOLD = 6;
    //當容量大於64時,連結串列才會轉為紅黑樹,否則,即便連結串列長度大於8,也不會轉,而是會擴容
    static final int MIN_TREEIFY_CAPACITY = 64;
    //以上的幾個屬性和HashMap一模一樣


    //擴容相關,每個執行緒負責最小桶個數
    private static final int MIN_TRANSFER_STRIDE = 16;
    //擴容相關,為了計算sizeCtl
    private static int RESIZE_STAMP_BITS = 16;
    //最大輔助擴容執行緒數量
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
    //擴容相關,為了計算sizeCtl
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    //下面幾個是狀態值
    //MOVED表示正在擴容
    static final int MOVED     = -1; // hash for forwarding nodes
    //-2表示紅黑樹標識
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations
    //計算Hash值使用
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    //可用CPU核數
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    //用於記錄容器中插入的元素數量
    private transient volatile long baseCount;
    //這個sizeCtl非常重要,基本上在程式碼中到處可以看到它的身影,後面會單獨分析一下
    private transient volatile int sizeCtl;
    //擴容相關
    private transient volatile int transferIndex;
    //計算容器size相關
    private transient volatile int cellsBusy;
    //計算容器size相關,在介紹相關程式碼的時候詳細介紹
    private transient volatile CounterCell[] counterCells;

上面的最開始的幾個屬性應該很好理解,後面的幾個屬性可能不知道有什麼用,沒關係,等到介紹相關程式碼的時候都會介紹的,這裡著重介紹下sizeCtl,這個欄位控制著擴容和table初始化,在不同的地方有不同的用處,下面列舉一下其每個標識的意思:

  • 負數代表正在進行初始化或擴容操作
  • -1代表正在初始化
  • -N 表示,這個高16位表示當前擴容的標誌,每次擴容都會生成一個不一樣的標誌,低16位表示參與擴容的執行緒數量
  • 正數或0,0代表hash表還沒有被初始化,正數表示達到這個值需要擴容,其實就等於(容量 * 負載因子)

CAS操作

上面介紹了ConcurrentHashMap是通過CAS + synchronized保證執行緒安全的,那CAS操作有哪些,如下:

    
//獲取陣列中對應索引的值
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }    //修改陣列對應索引的值,這個是真正的CAS操作 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //設定陣列對應索引的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }

上面三個方法,我看很多文章把這三個方法都歸類為CAS操作,其實第一個和第三個我覺得並不是,比如第一個方法,只是強制從主記憶體獲取資料,第三個方法是修改完資料之後強制重新整理到主記憶體,同時通知其他執行緒失效,只是為了保證可見性,而且這兩個要求被修改的物件一定要被volatile修飾,這也是上面在介紹table的時候被volatile修飾的原因。

put()方法

put方法實際呼叫的是putVal()方法,下面分析下putVal方法。

 1 final V putVal(K key, V value, boolean onlyIfAbsent) {
 2         if (key == null || value == null) throw new NullPointerException();
 3         //這個計算hash值的方法和hashMap不同
 4         int hash = spread(key.hashCode());
 5         //記錄連結串列節點個數
 6         int binCount = 0;
 7         //這個死迴圈的作用是為了保證CAS一定可以成功,否則就一直重試
 8         for (Node<K,V>[] tab = table;;) {
 9             Node<K,V> f; int n, i, fh;
10             //如果table還沒有初始化,初始化
11             if (tab == null || (n = tab.length) == 0)
12                 //初始化陣列,後面會分析,說明1
13                 tab = initTable();
14             //如果通過hash值定位到桶的位置為null,直接通過CAS插入,上面死迴圈就是為了這裡
15             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
16                 if (casTabAt(tab, i, null,
17                              new Node<K,V>(hash, key, value, null)))
18                     break;                   // no lock when adding to empty bin
19             }
20             //如果發現節點的Hash值為MOVED,協助擴容,至於為什麼hash值會為MOVEN,後面會說明,說明2
21             else if ((fh = f.hash) == MOVED)
22                 //協助擴容,在講解擴容的時候再講解
23                 tab = helpTransfer(tab, f);
24             else {
25                 //到這裡說明桶中有值
26                 V oldVal = null;
27                 //不管是連結串列還是紅黑樹都加鎖處理,防止別的執行緒修改
28                 synchronized (f) {
29                     //這裡直接從主記憶體重新獲取,雙重檢驗,防止已經被別的執行緒修改了
30                     if (tabAt(tab, i) == f) {
31                         //fh >= 0,說明是連結串列,為什麼fh>=0就是連結串列,這個就是hash值計算的神奇的地方,所有的key的hash都是大於等於0的,
32                         //紅黑樹的hash值為-2,至於為什麼為-2後面會說明,說明3
33                         if (fh >= 0) {
34                             //這裡就開始記錄連結串列中節點個數了,為了轉為紅黑樹做好記錄
35                             binCount = 1;
36                             //for迴圈遍歷連結串列
37                             for (Node<K,V> e = f;; ++binCount) {
38                                 K ek;
39                                 //如果key相同,就替換value
40                                 if (e.hash == hash &&
41                                     ((ek = e.key) == key ||
42                                      (ek != null && key.equals(ek)))) {
43                                     oldVal = e.val;
44                                     //這個引數傳的是false
45                                     if (!onlyIfAbsent)
46                                         e.val = value;
47                                     break;
48                                 }
49                                 //遍歷沒有發現有相同key的,就掛在連結串列的末尾
50                                 Node<K,V> pred = e;
51                                 if ((e = e.next) == null) {
52                                     pred.next = new Node<K,V>(hash, key,
53                                                               value, null);
54                                     break;
55                                 }
56                             }
57                         }
58                         //如果是紅黑樹,這裡就是上面介紹的,陣列中存的不是TreeNode,而是TreeBin
59                         else if (f instanceof TreeBin) {
60                             Node<K,V> p;
61                             binCount = 2;
62                             //向紅黑樹插入
63                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
64                                                            value)) != null) {
65                                 oldVal = p.val;
66                                 if (!onlyIfAbsent)
67                                     p.val = value;
68                             }
69                         }
70                     }
71                 }
72                 if (binCount != 0) {
73                     //如果連結串列長度大於等於8,轉為紅黑樹,至於怎麼轉在介紹紅黑樹部分的時候再詳細說
74                     if (binCount >= TREEIFY_THRESHOLD)
75                         treeifyBin(tab, i);
76                     if (oldVal != null)
77                         return oldVal;
78                     break;
79                 }
80             }
81         }
82         //計算size++,不過是執行緒安全的方式,這裡這篇文章先不介紹,之後會專門介紹
83         addCount(1L, binCount);
84         return null;
85     }

整個過程梳理如下:

  1. 陣列沒有初始化就先初始化陣列
  2. 計算當前插入的key的hash值
  3. 根據第二步的hash值定位到桶的位置,如果為null,直接CAS自旋插入
  4. 如果是連結串列就遍歷連結串列,有相同的key就替換,沒有就插入到連結串列尾部
  5. 如果是紅黑樹直接插入
  6. 判斷連結串列長度是否超過8,超過就轉為紅黑樹
  7. ConcurrentHashMap元素個數加1

上面程式碼中標紅的地方說明:

說明一:initTable()

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //如果這個值小於零,說明有別的執行緒在初始化
            if ((sc = sizeCtl) < 0)
                //讓出CPU時間,注意這時執行緒依然是RUNNABLE狀態
                //這裡使用yield沒有風險,因為即便這個執行緒又競爭到CPU,再次迴圈到這裡它還會讓出CPU的
                Thread.yield(); // lost initialization race; just spin
            //初始狀態SIZECTL為0,通過CAS修改為-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //初始化
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //擴容點,比如n = 16,最後計算出來的sc = 12
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

說明二:擴容狀態為什麼hash為MOVEN

//構造方法,裡面使用super,也就是他的父類Node的構造方法   
 ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

上面介紹ForwardingNode的時候說過,這個是擴容的時候,如果這個桶處理過了就設定為該節點,這個類的構造方法可以看出,它會把hash值設定為MOVEN狀態。

說明三:紅黑樹TreeBin的hash值為什麼為-2

TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            this.first = b;
            TreeNode<K,V> r = null;
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;
                    r = x;
                }
//省略部分程式碼
}

這個是TreeBin的構造方法,這個super同樣是Node的構造方法,hash值為TREEBIN = -2

get()方法

 public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//計算key的hash值
int h = spread(key.hashCode());
//陣列不為空,獲取對應桶的值
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
//獲取到,直接返回value
if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; }
//小於0,就是上面介紹的TREEBIN狀態,是紅黑樹,在紅黑樹中查詢
else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null;
//連結串列的處理方法,一個一個遍歷
while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

get方法很簡單,就是去各個資料結構中找,不過紅黑樹的遍歷還是要好好看看的,這裡先不分析,紅黑樹這玩意為了實現自平衡,定義了很多的限制條件,實現起來的複雜度真是爆炸,之後文章會分析,不過程式碼看的我都快吐了,哈哈哈。

    

總結

本篇文章就先分析到這,不然就太長了,本文介紹了ConcurrentHashMap的儲存結構,節點構成,以及初始化方法,put和get方法,整體來說這部分比較簡單,ConcurrentHashMap複雜的部分是擴容和計數,當然我自己覺得紅黑樹部分是最複雜的,後面再慢慢介紹。