1. 程式人生 > >ConcurrentHashMap 源碼淺析 1.8

ConcurrentHashMap 源碼淺析 1.8

作者 alt 追加 ceo warnings tile ech 正在 左移

一、簡介

前面的一篇文章我們介紹了ConcurrentHashMap1.7版本版本的源碼介紹,我們知道1.7版本的ConcurrentHashMap采用的是分段鎖的思想,提高了鎖的數量,提高了並發的特性,但是也有其局限性,例如就是並發的數量也就是鎖的數量是不可改變的等;我們今天要介紹的1.8版本的ConcurrentHashMap其實也是采用了多鎖的思想,不過在1.8中沒有了segments這些東西了,每次鎖住的數組中的一個元素或者桶(其實也就是數組或者樹的頭結點),然後鎖也和1.7發生變了,使用的是Synchronized鎖,1.8中的鎖是隨著數組的長度發生變化的,提升了並發的數量的靈活性,還有就是1.8的數據結構也發生了一些變化,采用的是數組+鏈表+紅黑樹(鏈表到達閾值會樹化),結構如下圖所示:

技術分享圖片

二、基本成員
先介紹一些基本成員,只有了解了這些成員的概念後,才能去更好的去理解方法。
ConcurrentHashMap的一些成員變量

/** node數組的最大容量 2^30 */
    private static final int MAXIMUM_CAPACITY = 1 << 30;

   /** 默認初始化值16,必須是2的冥 */
    private static final int DEFAULT_CAPACITY = 16;

   /** 虛擬機限制的最大數組長度,在ArrayList中有說過,jdk1.8新引入的,需要與toArrar()相關方法關聯 */
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

   /** 並發數量,1.7遺留,兼容以前版本 */
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /** 負載因子,兼容以前版本,構造方法中指定的參數是不會被用作loadFactor的,為了計算方便,統一使用 n - (n >> 2) 代替浮點乘法 *0.75 */
    private static final float LOAD_FACTOR = 0.75f;

    /** 鏈表轉紅黑樹,閾值>=8 */
    static final int TREEIFY_THRESHOLD = 8;

    /** 樹轉鏈表閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,
     *  <=UNTREEIFY_THRESHOLD 則untreeify(lo))
     */
    static final int UNTREEIFY_THRESHOLD = 6;

    /** 鏈表轉紅黑樹的閾值,64(map容量小於64時,鏈表轉紅黑樹時先進行擴容) */
    static final int MIN_TREEIFY_CAPACITY = 64;

/** 下面這三個和多線程協助擴容有關 */

   /** // 擴容操作中,transfer這個步驟是允許多線程的,這個常量表示一個線程執行transfer時,最少要對連續的16個hash桶進行transfer
    //     (不足16就按16算,多控制下正負號就行)
    private static final int MIN_TRANSFER_STRIDE = 16;

    /** 生成sizeCtl所使用的bit位數 */
    private static int RESIZE_STAMP_BITS = 16;

    /** 參與擴容的最大線程數 */
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    /**  移位量,把生成戳移位後保存在sizeCtl中當做擴容線程計數的基數,相反方向移位後能夠反解出生成
     戳 */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /*
     * Encodings for Node hash fields. See above for explanation.
     */
    static final int MOVED     = -1; // 表示正在轉移
    static final int TREEBIN   = -2; // 表示已經轉換為樹
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

    /** 可用處理器數量 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

         /** 用於存放node數組 */
    transient volatile Node<K,V>[] table;

        /**
     * baseCount為並發低時,直接使用cas設置成功的值
     * 並發高,cas競爭失敗,把值放在counterCells數組裏面的counterCell裏面
     * 所以map.size = baseCount + (每個counterCell裏面的值累加)
     */
    private transient volatile long baseCount;

        /**
     * 控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
     * 當為負數時:-1代表正在初始化,-N就代表在擴容,-N-RS-2就代表有多少個線程在協助擴容
     * 當為0時:代表當時的table還沒有被初始化
     * 當為正數時:表示初始化或者下一次進行擴容的大小
     */
    private transient volatile int sizeCtl;

        /**
     * 通過cas實現的鎖,0 無鎖,1 有鎖
     */
    private transient volatile int cellsBusy;

    /**
     * counterCells數組,具體的值在每個counterCell裏面
     */
    private transient volatile CounterCell[] counterCells;

Node,內部類,主要用於存儲鍵值,有ForwardingNode、ReservationNode、TreeNode和TreeBin四個子類,具體在後面代碼用到的時候講。

static class Node<K,V> implements Entry<K,V> {
        final int hash;
        final K key;
        // val和next 在擴容時可能發生變化,加上volatile關鍵字,提供可見性與重排序
        volatile V val;
        volatile Node<K,V> next;

                // 不允許修改val
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

三、主要方法
上面介紹了一些基本成員,主要介紹一些常用方法,只有紅黑樹相關方法占時不講(還沒有搞明白紅黑樹,嘿嘿)。
①、構造方法

**
     * 無參構造
     */
    public ConcurrentHashMap() {
    }

    /**
     * 指定初始化大小的構造,不能小於0
     * @param initialCapacity
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // cap必須是2的n次方,
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                MAXIMUM_CAPACITY :
                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

②、初始化方法(采用延遲初始化,在put方法裏面)

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) { // 空的table 才能初始化
            if ((sc = sizeCtl) < 0) // 表示其它線程正在初始化或者擴容
                // 當前線程把執行權交給其它線程(擁有相同優先級的線程),然後變成可運行狀態
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 原子操作表示把SIZECTL設置為-1,正在初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化大小
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 初始化
                        table = tab = nt;
                        sc = n - (n >>> 2); // 下一次擴容閾值 n*0,75
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

③、put方法(重要,其中的擴容方法比較難理解)
分析:
1)、我們可以通過源碼判斷key和value不允許為null。
2)、需要判斷table有沒有初始化,沒有調用initTable初始化,然後接著循環。
3)、判斷key的hash(調用spread方法)的位置有沒有值,證明是第一個,使用cas設置,為什麽cas,可能不止一個線程。
4)、判斷當前線程的hash是不是MOVED,其實就是節點是不是ForwardingNode節點,ForwardingNode代表正在擴容,至於為什麽會是ForwardingNode,這個在擴容的方法裏面再講,如果是ForwardingNode節點就協助擴容,也就是當前也去擴容,然後擴容完畢,在執行循環,協助擴容執行helpTransfer方法。
5)、如果不是擴容、table也初始化了和hash位置也有值了,那證明當前hash的位置是鏈表或者樹,接下來鎖住這個節點,進行鏈表或者樹的節點的追加,如果存在相同的key,就替換,最後釋放鎖。
6)、判斷鏈表的節點數,有沒有大於等於8,滿足就樹化,調用treeifyBin方法,這個方法會在樹化前判斷大於等於64嗎,沒有就擴容,調用tryPresize方法,有就樹化。
7)、修改節點的數量,調用addCount方法。

public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 不允許key或者value 為null
        if (key == null || value == null) throw new NullPointerException();
        // 獲取hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 遍歷table,死循環,直到插入成功
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0) // table 還沒有初始化
                tab = initTable(); // 初始化
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 當前位置為空 ,直接插入
                if (casTabAt(tab, i, null, // 使用cas來進行設置
                        new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED) // 如果在進行擴容,則先進行協助擴容
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 沒有在擴容,頭結點也不是空,
                // 鎖住鏈表或者樹的頭節點
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) { // 普通Node的hash值為key的hash值大於零,而ForwardingNode的是-1,TreeBin是-2
                            binCount = 1;
                            // 遍歷鏈表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) { // 找到了相同的key
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value; // 替換value 結束循環
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) { // 找到最後一個節點
                                    pred.next = new Node<K,V>(hash, key,
                                            value, null); // 把當前節點設置為最後一個節點的next
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { // 如果是樹結構
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                    value)) != null) { // 樹節點插入,存在就替換
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD) // 如果鏈表大於等於8,樹化
                        treeifyBin(tab, i); // 樹化
                    if (oldVal != null) // 證明存在相同的key,是替換return舊值
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); //數量加1
        return null;
    }

註:上面的put方法用到了initTable、helpTransfer、treeifyBin、tryPresize和addCount方法,接下來我們按照程序的流程講解下這個幾個方法。
initTable方法,這個在前文講過了。
helpTransfer方法幫助擴容

分析:
1)、這裏得講一下resizeStamp方法Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)),其實這個方法就是獲取table的length的二進制的最高位的前面0的個數,然後|上2^15,舉個例子吧,假如現在的length為16,那麽二進制是多少了16是2^4,所以二進制10000,所以其實就是27 | 2 ^ 15,這裏還得順便講一下MAXIMUM_CAPACITY = 1 << 30(為什麽是2^30了),因為這是正數的最大值,那麽再給這個數加上一些值會發生什麽了(其實這可能就是為什麽要去二進制最高位前面0的個數的原因)。
2)、怎麽判斷的擴容已經開始了,我們知道sizectl為-1是代表正在初始化,大於0表示已經初始化,如下方法也判斷了size小於0,那麽什麽時候sizectl還會為負數了,其實開始擴容的時候(參考addCount和tryPresize方法,方方法裏面都有(rs << RESIZE_STAMP_SHIFT) + 2,這裏rs就是上面的resizeStamp的返回值,其實就是左移16位,int的正數的最大值是2^30,再給他加值會變成負數,對rs右移在累加顯然已經大於了2^30,所以他是負數),由此判斷出擴容已經開始。
3)、什麽時候協助擴容了,當前是擴容開始了,但是還沒結束,所以下面的滿足sc小於的裏面的第一if就是判斷擴容有沒有完成,第二if就是使用cas加入協助擴容的過程。
4)、transfer方法,我們在後面詳解。

/**
     * 幫助擴容
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        // 原table不等於空,當前節點必須是fwd節點,nextTab已經初始化
        if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            // 其實就是去tab.length二進制最高位前面有多少個0,然後 | 1 << 15
            int rs = resizeStamp(tab.length);
            // nextTab和成員變量一樣,table也一樣,sizeCtl<0,表示在擴容
            while (nextTab == nextTable && table == tab &&
                    (sc = sizeCtl) < 0) {
                // sc >>> RESIZE_STAMP_SHIFT) != rs 說明擴容完畢或者有其它協助擴容者
                // sc == rs + 1 表示只剩下最後一個擴容線程了,其它都擴容完畢了
                // transferIndex <= 0 擴容結束了
                // sc == rs + MAX_RESIZERS 到達最大值
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                // 當前線程參加擴容,sc+1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

treeifyBin方法,在table的length小於64時會調用tryPresize 先進行擴容,調用tryPresize方法,在下文會進行解釋。

private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 當前table的length小於64,就擴容
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                            null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

tryPresize 方法,這裏的邏輯和helpTransfer都差不多,至於transfer方法主菜我們在後面上

private final void tryPresize(int size) {
        // 計算擴容的size
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        // 證明table已經初始了或者還沒有初始化
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {// 證明還沒有初始化,需要初始化
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            // 大於最大容量返回
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            // 已經初始化,並且沒有大於最大容量
            else if (tab == table) {
                int rs = resizeStamp(n);
                // 判斷是否需要協助擴容
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 開始擴容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

transfer擴容方法,比較難理解的一個方法
分析:
1)、看stride這個參數其實就是算每個線程處理的數量,和CPU有關,最小是16.
2)、初始化一個原來二倍的新table就是 nextTable,然後這個過程可能會出錯,n<<1可能為負數,設置nextTable和transferIndex,其中transferIndex就是原table的長度。
3)、初始化一個ForwardingNode節點在後面會用到。
4)、死循環for,這個循環就是為每個線程分配任務,然後每個線程處理各自的任務,倒敘分配,舉個例子,加入table.length=32,現在的stride為16,第一個線程其實就是32到16(不包含32,因為是索引),第二個線程就是0-15,參考這一段代碼((U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0)))),然後遍歷每個段,處理節點,知道處理完成,具體邏輯參考代碼註釋。

/**
     * 擴容方法
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // n >>> 3(也就是除以8) / cpu個數,每個cpu的每個線程負責的遷移的數量
        // 這樣的目的是為了每個cpu處理的桶一樣多,避免出現任務轉移不均勻的現象,如果桶少的話,默認一個cpu(一個線程)處理16個桶
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // 擴容table 沒有初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //  初始化原來的length兩倍的table
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                // 初始化失敗,使用integer的最大值
                sizeCtl = Integer.MAX_VALUE;
                return; // 結束
            }
            // 更新成員變量
            nextTable = nextTab;
            // 更新轉移下標,就是運來的table的length
            transferIndex = n;
        }
        // 新table的length
        int nextn = nextTab.length;
        // 創建一個fwd節點,用於占位.當別的節點發現這個槽位中有fwd節點時,則跳過這個節點
        // 它的hash為MOVED
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 首次推進為true,如果為true說明需要再次推進一個目標(i--),反之如果是false,那麽就不能推進下標,需要將當前的下標處理完畢
        boolean advance = true;
        // 完成狀態,如果為true,就結束方法
        boolean finishing = false; // to ensure sweep before committing nextTab
        // 死循環,因為是倒著遍歷,所以i是點前線程的最大位置(i---),bound是邊界,也就是區間裏面的最小值
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 如果當前線程可以向後推進,這個循環就是控制i遞減.同時每個線程都會進入這裏取得自己需要轉移的桶的下標區間
            // 1. true
            while (advance) {
                int nextIndex, nextBound;
                // 1. -1 >= 0,false
                if (--i >= bound || finishing)
                    advance = false;
                    //transferIndex <= 0 說明已經沒有需要遷移的桶了
                // 1.nextIndex = 16 <= 0
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //更新 transferIndex
                //為當前線程分配任務,處理的桶結點區間為(nextBound,nextIndex)
                // 1.16 > 16 ? 16 -16 : 0 區間 16 到0
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound; // 0
                    i = nextIndex - 1;// 15
                    advance = false;
                }
            }
            // i = 15 nextn = 32

            // i < 0 ,表示數據遷移已經完成
            // i >= n 和 i + n >= nextn 表示最後一個線程也執行完成了,擴容完成了
            //  第二個if裏面的i=n
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) { // 完成擴容
                    nextTable = null; // 刪除成員變量
                    table = nextTab; // 更新table
                    sizeCtl = (n << 1) - (n >>> 1); // 更新閾值
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 表示一個線程退出擴容
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 說明還有其他線程正在擴容
                        return; // 當前線程結束
                    // 當前線程為最後一個線程,負責在檢查一個整個隊列
                    finishing = advance = true; //
                    i = n; // recheck before commit
                }
            }
            // 待遷移桶為null,用cas把當前節點設置為ForwardingNode節點,表示已經處理
            else if ((f = tabAt(tab, i)) == null) //  第一個線程 獲取i處的數據為null,
                advance = casTabAt(tab, i, null, fwd);// 設置當前節點為 fwd 節點
            else if ((fh = f.hash) == MOVED) // 如果當前節點為 MOVED,說明已經處理過了,直接跳過
                advance = true; // already processed
            else {
                // 節點不為空,鎖住i位置的頭結點
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) { //  表示是鏈表
                            int runBit = fh & n; // fn表示f.hash & n ,表示獲取原來table的位置
                            Node<K,V> lastRun = f; // lastRun = f
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n; // 獲取節點的位置
                                if (b != runBit) { // 如果這個節點和上一個節點的位置不一樣,記錄節點和位置
                                    runBit = b; // 當前節點的位置
                                    lastRun = p; // 當前節點
                                }
                            }
                            // 不管runBit有沒有發生變化,只可能是0或者n,
                            // ln表示的不變化的節點
                            // hn表示的是變化節點的位置
                            if (runBit == 0) { // 如果是0,那麽ln=lastRun就是位置沒有變的這條鏈 hn=null變化鏈需要遍歷重組
                                ln = lastRun;
                                hn = null;
                            }
                            else { // 如果當前節點不是0,hn=lastRun這個變化鏈,ln=null沒有變化的鏈需要遍歷重組
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 原來位置
                            setTabAt(nextTab, i, ln);
                            // 變化位置
                            setTabAt(nextTab, i + n, hn);
                           // 原來table的位置設置fwd節點,表示擴容
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

addCount主要是用於修改map的size,和擴容用的,這裏我們只看後半部分擴容部分,修改count部分在map的size方法講解

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 嘗試使用cas更新baseCount失敗
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 在counterCells沒有初始化,或者嘗試cas更新當前線程的CounterCell失敗時
            // 調用fullAddCount更新
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }

        // check >= 0,新加入一個值
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // s代表了 現在map的數據量
            // sc= 12 ,證明剛剛初始化,沒有進行擴容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null && // 當前容量大於sc,table已經有值,table的cap小於最大cap
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n); // 這一步不好理解,
                // Integer.numberOfLeadingZeros(n) 其實就是最高位前面有多少個0,n代表table的長度
                // | (1 << (RESIZE_STAMP_BITS - 1)) 2^15 二進制16位,第16位1,其余15位0
                // 其實就是相加
                // 表示正在擴容
                if (sc < 0) {

                    // sc >>> RESIZE_STAMP_SHIFT  // 擴容結束
                    // 只有最後一個線程在擴容
                    // sc == rs + MAX_RESIZERS  達到最大數
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    // sc 加1,表示有一個線程,協助參加了擴容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 表示沒有正在進行擴容,開始擴容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

④、get方法(get方法的邏輯相對就要簡單點了,請看代碼註釋)

// get方法
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 獲取hash值
        int h = spread(key.hashCode());
        // table不為null,table已經初始化,通過hash查找的node不為nul
        if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) { // hash相等
                if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 找到了相同的key
                    return e.val; // 返回當前e的value
            }
            else if (eh < 0) // hash小於0,說明是特殊節點(TreeBin或ForwardingNode)調用find
                return (p = e.find(h, key)) != null ? p.val : null;
            // 不是上面的情況,那就是鏈表了,遍歷鏈表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

⑤、size方法(和1.7的處理方式截然不同)

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                        (int)n);
    }

        final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

CounterCell[]這個數組就是記錄的map的count,這裏就不得不講一下addCount方法的前半部分,只有理解了每次添加一個元素,count是怎麽處理的,才能明白為什麽要有這個數組
CounterCell 內部類

static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

addCount方法,只看前半部分,我們可以看出其實修改map的count,先是使用cas修改basecount,然後可能存在多個線程同時修改,所以會失敗,失敗就用CounterCell[]數組處理,調用fullAddCount方法。

/**
     *
     * @param x 1L
     * @param check 默認值是0,等於0時,代表插入為null
     *     不等於0時,check等於2代表了樹,其它代表了鏈表
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 嘗試使用cas更新baseCount失敗
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 在counterCells沒有初始化,或者嘗試cas更新當前線程的CounterCell失敗時
            // 調用fullAddCount更新
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        }
    }

fullAddCount,主要用來記錄競爭導致的basecount修改失敗的這些操作,其實主要就是把這些失敗的次數記錄在CounterCell[]數組裏面,然後在統計size時,就是basecount+CounterCell[]裏面的次數。
分析:CounterCell[]可以看做是一個map,因為好多處理方法和map類似,我們先來看下數組的長度,默認是2,其實是可以擴容的,每次擴容2倍(擴容不能超過cpu的數量),然後怎麽插入值了,和map類似都需要確定位置,那麽怎麽確定位置了,map是通過key的hashcode,而這個數組是通過一個並發隨機數ThreadLocalRandom來說生成一個隨機數,然後通過這隨機數&數組的長度減一確定位置,是不是很map一樣,還有就是他沒有鏈表這個概念,那沖突了怎麽辦了,其實就是累加,這個數組還有鎖的概念就是cellsBusy,因為這裏可能也是多個線程來執行,等於零就表示沒有鎖,等於一就表示有鎖,在插入新值、擴容和創建數組這些操作都需要獲取鎖,具體的方法的概念就到這裏,具體的邏輯參考代碼註釋。

/**
     *
     * @param x 需要更新的值
     * @param wasUncontended 是否發生競爭
     */
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        // 初始化一個隨機值
        // ThreadLocalRandom是JDK 7之後提供並發產生隨機數,能夠解決多個線程發生的競爭爭奪。
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            // 為當前線程初始化一個隨機值
            ThreadLocalRandom.localInit();      // force initialization
            // 獲取這個值
            h = ThreadLocalRandom.getProbe();
            // 由於重新生成了probe,未沖突標誌位設置為true
            wasUncontended = true;
        }

       // 沖突標誌位,決定了擴容還是不擴容
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            // counterCells數組已經被初始化了
            if ((as = counterCells) != null && (n = as.length) > 0) {
                // 求在counterCells中的位置,與hash一樣求%,因為counterCells數組長度是2的冥
                if ((a = as[(n - 1) & h]) == null) { // 當前位置沒有CounterCell
                    if (cellsBusy == 0) {// Try to attach new Cell
                        // 創建新的CounterCell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&   // cellsBusy=0還沒有加鎖,使用cas進行加鎖,cellsBusy設置為1
                                U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                    rs[j] = r; // 放進數組
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            // 操作成功,退出死循環
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                // 在調用fullAddCount之前就發生了競爭
                // 然後wasUncontended=true,未發生競爭,然後重新循環更新
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 當前位置的CounterCell不為空,進行累加
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                // 數組被擴容了
                // 數組大於了cpu數量
                // 設置沖突標誌, collide = false,防止擴容
                else if (counterCells != as || n >= NCPU)
                    collide = false;
                // 設置沖突標誌,重新執行循環
                // 如果下次循環執行到該分支,並且沖突標誌仍然為true
                // 那麽會跳過該分支,到下一個分支進行擴容// At max size or stale

                // 這個位置決定了擴容還是不擴容 false就不擴容,true就擴容
                else if (!collide)
                    collide = true;
                // 擴容,CAS設置cellsBusy值
                else if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            // 容量擴大一倍
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 為當前線程重新計算probe
                h = ThreadLocalRandom.advanceProbe(h);
            }
            // 證明數組為空
            // 獲取鎖,初始化數組
            else if (cellsBusy == 0 && counterCells == as &&
                    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) { // 沒有被其它線程初始化
                        // 初始化,默認長度2
                        CounterCell[] rs = new CounterCell[2];
                        // 創建新的CounterCell(x),位置為rs[h&(2-1)]
                        rs[h & 1] = new CounterCell(x);
                        // 賦值給成員變量counterCells
                        counterCells = rs;
                        // 初始化成功
                        init = true;
                    }
                } finally {
                    // 釋放鎖
                    cellsBusy = 0;
                }
                if (init)
                    // 結束循環
                    break;
            }
            // 證明在CounterCell上也存在競爭,那麽嘗試對baseCount進行更新
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

四、總結

本文主要基於jdk1.8介紹了ConcurrentHashMap的一部分常用方法,主要講了get、put和size者三個常用方法,其中比較難理解的是put方法,其中在擴容和協助擴容作者的設計讓人眼前一亮,大師就是大師,絕對值得你去一探究竟,還有就是size也采用了分治的思想(不知道這個詞合適不,個人理解),就是統計累加count時,沒有競爭的單獨處理,有競爭的單獨處理,而沒有采用自旋,極大的提升了效率;1.8和1.7 的區別很大,首先數據結構發生變化,其次鎖也發生了變化、擴容側率和size的統計等;最後附上學習學習Map和ConcurrentHashMap的一點小建議,如果想從1.7和1.8兩個版本看,建議從這個方向(jdk版本一樣,數據結構基本沒有發生變化)HashMap 1.7>>ConcurrentHashMap 1.7>>HashMap 1.8>>[ConcurrentHashMap 1.8]()
參考 《Java 並發編程的藝術》

ConcurrentHashMap 源碼淺析 1.8