1. 程式人生 > >ConcurrentHashMap 詳解一

ConcurrentHashMap 詳解一

本文程式碼來自JDK8

  1. ConcurrentHashMap 實現了執行緒安全;
  2. 雖然可以通過 Hashtable 或者 Collections.synchronizedMap 來生成一個執行緒安全的 Map 例項, 但這是全域性鎖方式, 效能不行;
  3. ConcurrentHashMap 的儲存方式和 HashMap 十分類似, 都是用陣列 + 連結串列 + 紅黑樹的結構, 差別在於一些操作需要做執行緒同步處理;
  4. 在 JDK7 ConcurrentHashMap 使用 Segment 分段鎖的方式實現執行緒安全, 而在 JDK8 就拋棄這種做法, 採用 CAS 演算法來保證執行緒安全, 這裡就不展開討論分段鎖的方式了, 有興趣可以去找 JDK7 的原始碼分析.

記憶體模型

要了解 Java 的執行緒安全, 首先要知道它的記憶體模型. 記憶體模型是一個概念, 簡單來講就是它定義了虛擬機器裡多執行緒如何訪問變數. 這裡涉及到兩個概念: 主記憶體和工作記憶體. 執行緒之間的共享變數都是儲存在主記憶體, 每個執行緒如果想要訪問變數, 那麼需要先把變數從主記憶體載入到私有的工作記憶體, 然後對工作記憶體的變數進行操作, 最後再更新到主記憶體中. 注意區分這裡的記憶體模型跟硬體記憶體不是一碼事, 簡單理解為主記憶體和工作記憶體都可以包含 CPU 暫存器, CPU 快取, RAM.

volatile

根據記憶體模型, 假設有一個成員變數 value, 執行緒 A 修改了它, 實際是修改執行緒 A 私有的工作記憶體裡面的 value 的副本, 還沒有更新到主記憶體的 value, 因此執行緒 B 讀取的 value 還是舊的 value, 這樣就不同步了.
而使用 volatile 修飾的成員變數 value 有個特點, 那就是假如執行緒 A 修改了這個變數的值, 那麼它會通知其他執行緒應該去主記憶體讀取新的 value, 這就是可見性.

CAS

CAS 全稱是比較並交換, 是一種樂觀鎖機制, 它要求變數對其他執行緒是可見的, 所以需要用 volatile 修飾, 當然也不是一定要這樣, 也可以用 synchronized 來實現, 畢竟這只是個演算法, 實現方式多種多樣. 涉及到三個引數: 記憶體值, 期待值, 更新值. 演算法有三個步驟:

  1. 讀取記憶體值;
  2. 比較記憶體值和期待值;
  3. 兩者相同就把更新值更新上去.

這三個步驟是一個原子操作, 實際是通過 CPU 匯流排加鎖或者快取加鎖方式來實現, 我也沒有具體深入瞭解, 只知道是個原子操作就行.
在 Java 中 CAS 的實現是在 Unsafe 類中, 它有個本地方法 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

, var1 表示物件, var2 表示物件地址, var4 表示期待值, var5 表示更新值.


ConcurrentHashMap 的插入方式跟 HashMap 是一樣的, 唯一的區別就是多了執行緒安全處理, 所以接下來主要是對執行緒安全詳細講解.

變數

  1. transient volatile Node<K,V>[] table;
    使用 volatile 修飾的陣列, 針對該陣列引用具有可見性, 對於陣列元素沒有可見性. 為了保證陣列元素也有可見性, 這裡就用 volatile 修飾 Node 類的 val 和 next.
  2. private transient volatile Node<K,V>[] nextTable;
    用來擴容時的臨時陣列
  3. private transient volatile int sizeCtl;
    一個控制欄位, -1 表示在初始化陣列, -N 表示有 N-1 個執行緒在擴容, table 初始化後儲存下一個次擴容的大小, 跟閾值一樣. 這裡要說明一下, 雖然都說 -N 表示有 N-1 個執行緒擴容, 我也沒仔細研究是不是這樣, 但是在程式碼裡面是這樣來設定的: 第一個執行緒需要擴容, 它會用一個非常大的負數對 sizeCtl 設定, 此後每多一個執行緒擴容, sizeCtl 就會加一, 然後擴容完成後 sizeCtl 減一, 最後把它設定成新容量的閾值(正數).

初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl 小於 0, 表示陣列正在被其他執行緒進行初始化, 那麼掛起本執行緒
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin

        // U 是 Unsafe 類的例項, 用來操作 CAS 演算法
        // 這裡使用 CAS 演算法把 sizeCtl 改成 -1
        // 如果修改成功會返回 true, 然後初始化 table
        // 因為這個 CAS 條件, 這裡的初始化只有一個執行緒執行, 其他執行緒是不會進入這裡
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 相當於 0,75*n
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

擴容

ConcurrentHashMap 是支援多執行緒擴容方式, 比如執行緒 A 把舊陣列的索引位置 1 複製到新陣列, 同時執行緒 B 也把舊陣列的索引位置 2 複製到新陣列, 效率更加快. 另外每個執行緒都是負責一整個索引區域, 所以擴容原理是先找出本執行緒應該負責的索引區域, 然後遍歷這個區域, 將其中的節點複製到新陣列.

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 單個執行緒允許處理的最少 table 桶首節點數量
    // 即每個執行緒的任務量
    // 大概是根據 CPU 數量來算出, 為什麼這樣算我還沒弄明白
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    // nextTab 作為臨時陣列先擴容一倍
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;

    // 這是一個特殊的節點, hash 值設定為 -1, 也就是常量 MOVED
    // 擴容過程中遇到索引位置為空就設定成該節點
    // 或者索引位置不為空, 但是已經處理複製後也把索引位置設定為該節點
    // 目的是為了告訴其他執行緒不需要再處理該索引位置
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

    // 表示索引 i 節點是否被複製成功
    boolean advance = true;
    // 表示所有節點複製完成
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;

        // 這個迴圈目的很簡單
        // 首先我們要知道擴容是一批一批的複製到新陣列的
        // 比如把索引範圍 [10, 16) 的節點複製到新陣列
        // 而這裡是逆序擴容, 比如原來陣列範圍是 [0, 16), 首先是對 [10, 16) 進行復制
        // 還有變數 stride 就是區間大小, 比如這裡就是 6
        // 所以這個迴圈目的就是為了找出允許執行緒擴容的索引範圍 [bound, i]
        // 這裡只有更新共享變數 transferIndex 才用到 CAS 演算法, 其他操作就不需要了
        while (advance) {
            int nextIndex, nextBound;
            // 滿足 [bound, i] 這個區間或者已經完成擴容, 跳出這個迴圈
            if (--i >= bound || finishing)
                advance = false;
            // nextIndex 是邊界 i 的臨時儲存, 如果小於 0, 說明沒有要複製的節點了
            // transferIndex 是共享變數, 儲存區間範圍的上限, 初始值是舊陣列長度
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }

            // 嘗試更新 transferIndex
            // 如果成功, 當前執行緒就負責複製 [nextBound, nextIndex) 範圍的節點
            // transferIndex 變成 nextBound
            // 注意這裡 i=nextIndex-1, 所以 [nextBound, nextIndex) 也是 [bound, i]
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }

        // 下面開始複製 [bound, i] 範圍的節點, 逆序複製, 從 i 開始

        // 對於擴容完成處理
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                // sizeCtl 設定為總大小的 0.75
                // 至於這裡為什麼不用 CAS 更新值, 可能是沒有必要吧, 重複更新也沒關係
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 擴容完成, sizeCtl 減一      
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 擴容前 sizeCtl 會設定成 resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
                // 如果不相等說明有其他執行緒執行擴容完成的操作了, 本執行緒不需要重複操作了
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }

        // 對於 i 的節點為空, 那麼設定指向特殊節點 ForwardingNode
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 當前執行緒判斷到這個節點的 hash 值是 MOVED
        // 說明是特殊節點, 已經有其他執行緒操作了, 可以跳過這個節點
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed

        // 如果 i 既不是空值, 也不是特殊節點, 說明這是個普通節點
        // 那麼就開始對這個連結串列或者樹進行復制, 首先是把它鎖上, 防止其他執行緒同時操作它
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;

                    // 下面就是對連結串列或者樹複製的過程, 具體可以參考 HashMap
                    // 值得注意的是對於樹結構, 這裡索引位置不是直接指向樹的根節點
                    // 而是用 TreeBin 構造紅黑樹, 然後指向這個 TreeBin
                    // TreeBin 的 hash 值設定為 -2, 而其他節點的 hash 值都是大於 0
                    // 節點 hash 值的計算可以參考 spread 方法
                    // 因此可以通過 hash 值大於等於 0 來判斷是連結串列還是樹結構
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 複製完成後用特殊節點代替原來節點
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 這裡建立 TreeBin 來構造紅黑樹, 具體構造過程可以參考 HashMap
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 複製完成後用特殊節點代替原來節點
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

/**
 * 計算 key 的 hash 值, 其中 HASH_BITS 是 0x7fffffff, 所以 hash 值一定大於等於 0
 */
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}