1. 程式人生 > 資訊 >美國油價上漲刺激新能源車銷量:推廣電車已成國策,但想買到也不是那麼容易

美國油價上漲刺激新能源車銷量:推廣電車已成國策,但想買到也不是那麼容易

ConcurrentHashMap

HashMap通常的實現方式是“陣列+連結串列”,這種方式被稱為“拉鍊法”。ConcurrentHashMap在這個 基本原理之上進行了各種優化

首先是所有資料都放在一個大的HashMap中;其次是引入了紅黑樹,原理如下:

  • 如果頭節點是Node型別,則尾隨它的就是一個普通的連結串列;如果頭節點是TreeNode型別,它的後 面就是一顆紅黑樹,TreeNode是Node的子類
  • 連結串列和紅黑樹之間可以相互轉換:初始的時候是連結串列,當連結串列中的元素超過某個閾值時,把連結串列轉 換成紅黑樹;反之,當紅黑樹中的元素個數小於某個閾值時,再轉換為連結串列。
  • 這種設計優化:
    • 使用紅黑樹,當一個槽裡有很多元素時,其查詢和更新速度會比連結串列快很多,Hash衝突的問 題由此得到較好的解決。
    • 加鎖的粒度,並非整個ConcurrentHashMap,而是對每個頭節點分別加鎖,即併發度,就是 Node陣列的長度,初始長度為16
    • 併發擴容,這是難度最大的。當一個執行緒要擴容Node陣列的時候,其他執行緒還要讀寫,因此 處理過程很複雜,後面會詳細分析
  • 所以這種設計一方面降低了Hash衝突,另一方面也提升了併發度

原始碼解析

1、構造方法

public ConcurrentHashMap(int initialCapacity,//初始容量
                             float loadFactor, //載入因子
                         int concurrencyLevel) {//級別
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
  • 引數:初始容量,載入因子,併發級別
  • 如果初始化大小小於併發級別,則讓他們相等
  • 判斷size大小
  • cap代表數字長度的值
    • 變數cap就是Node陣列的長度,保持為2的整數次方。tableSizeFor(...)方法是根 據傳入的初始容量,計算出一個合適的陣列長度。具體而言:1.5倍的初始容量+1,再往上取最接近的2 的整數次方,作為陣列長度cap的初始值
  • 這裡的 sizeCtl,其含義是用於控制在初始化或者併發擴容時候的執行緒數,只不過其初始值設定成 cap。

2、初始化

​ 構造方法裡只計算了陣列的初始大小,並沒有對陣列進行初始化。當多個執行緒都往裡面放 入元素的時候,再進行初始化。這就存在一個問題:多個執行緒重複初始化

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield();   // 自旋等待
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 重點:將sizeCtl設定為-1
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 初始化
                        table = tab = nt;
                        // sizeCtl不是陣列長度,因此初始化成功後,就不再等於陣列長度,而是n-(n>>>2)=0.75n,表示下一次擴容的閾值:n-n/4
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;// 設定sizeCtl的值為sc。
                }
                break;
            }
        }
        return tab;
    }
  • 首先需要初始化一個table,當sizeCtl等於0時,自旋等待
  • 若條件發生變化,則將sizeCtl設定為-1(併發,那個執行緒獲取-1,就可以進行下面的操作)
  • 初始化建立陣列,長度為n
  • sizeCtl不是陣列長度,因此初始化成功後,就不再等於陣列長度,而是n-(n>>>2)=0.75n,表示下一次擴容的閾值:n-n/4
  • 設定sizeCtl的值為sc,返回Node陣列

通過上面的程式碼可以看到,多個執行緒的競爭是通過對sizeCtl進行CAS操作實現的。如果某個執行緒成 功地把 sizeCtl 設定為-1,它就擁有了初始化的權利,進入初始化的程式碼模組,等到初始化完成,再把 sizeCtl設定回去;其他執行緒則一直執行while迴圈,自旋等待,直到陣列不為null,即當初始化結束時, 退出整個方法

3、put方法

public V put(K key, V value) {
        return putVal(key, value, false);
    }
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 分支1:整個陣列初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 分支2:第i個元素初始化
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 分支3:擴容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 分支4:放入元素
            else {
                V oldVal = null;
                // 加鎖
                synchronized (f) {
                    // 連結串列
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 紅黑樹
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 如果是連結串列,上面的binCount會一直累加
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);// 超出閾值,轉換為紅黑樹
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);// 總元素個數累加1
        return null;
    }
  • onlyIfAbsent:表示不存在的時候才向陣列中放
  • 若等於分支1.則呼叫初始化陣列方法initTable()
  • 若為分支2,第i個元素初始化(表示這個槽點沒有資料,所以直接將該元素放到頭節點返回)
  • 若為分支3,說明這個槽點正在擴容,helpTransfer方法幫助擴容
  • 最後到分支4,就是把元素放入槽內。槽內可能是一個連結串列,也可能是一棵紅黑樹,通過頭節點的型別 可以判斷是哪一種。第4個分支是包裹在synchronized (f)裡面的,f對應的陣列下標位置的頭節點, 意味著每個陣列元素有一把鎖,併發度等於陣列的長度
  • 上面的binCount表示連結串列的元素個數,當這個數目超過TREEIFY_THRESHOLD=8時,把連結串列轉換成 紅黑樹,也就是 treeifyBin(tab,i)方法。但在這個方法內部,不一定需要進行紅黑樹轉換,可能只做 擴容操作。

4、擴容

擴容是ConcurrentHashMap中實現最複雜的一環

從上面的put方法原始碼中,可知當binCount大於等於閾值8,變成紅黑樹,所以擴容從treeifyBin方法讀起

private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                //陣列長度小於閾值64,不做紅黑樹轉換,直接擴容
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                // 連結串列轉換為紅黑樹
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        // 遍歷連結串列,初始化紅黑樹
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
  • 判斷tab不等於null,說明槽點資料已經有了
  • 陣列長度小於閾值64,不做紅黑樹轉換,直接擴容
    • MIN_TREEIFY_CAPACITY=64(陣列的長度):儲存箱可樹化的最小表容量,這個值至少是4被閾值(TREEIFY_THRESHOLD=8)大小,以避免調整大小和樹化閾值之間的衝突
  • 若陣列長度大於閾值64時,則轉換為紅黑樹

在treeifyBin方法中,內部呼叫了方法tryPresize方法

private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // 計算步長
        if (nextTab == null) {            // 初始化新的HashMap
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //擴容兩倍
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //初始的transferIndex為舊HashMap的陣列長度
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
    // 此處,i為遍歷下標,bound為邊界
    // 如果成功獲取一個任務,則i=nextIndex-1
    //bound=nextIndex-stride;
    //如果獲取不到,則i=0,bound=0
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // advance表示在從i=transferIndex-1遍歷到bound位置的過程中,是否一直繼續
            while (advance) {
                int nextIndex, nextBound;
                // 以下是哪個分支中的advance都是false,表示如果三個分支都不執行,才可以一直while迴圈
				// 目的在於當對transferIndex執行CAS操作不成功的時候,需要自旋,以期獲取一個stride的遷移任務。
                if (--i >= bound || finishing)
                    // 對陣列遍歷,通過這裡的--i進行。如果成功執行了--i,就不需要繼續while迴圈了,因為advance只能進一步。
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 對transferIndex執行CAS操作,即為當前執行緒分配1個stride。
				// CAS操作成功,執行緒成功獲取到一個stride的遷移任務;
				// CAS操作不成功,執行緒沒有搶到任務,會繼續執行while迴圈,自旋。
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // i越界,整個HashMap遍歷完成
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // finishing表示整個HashMap擴容完成
                if (finishing) {
                    nextTable = null;
                    // 將nextTab賦值給當前table
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // tab[i]遷移完畢,賦值一個ForwardingNode
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // tab[i]的位置已經在遷移過程中
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                // 對tab[i]進行遷移操作,tab[i]可能是一個連結串列或者紅黑樹
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // 連結串列
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    // 表示lastRun之後的所有元素,hash值都是一樣的
									// 記錄下這個最後的位置
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {// 連結串列遷移的優化做法
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {// 紅黑樹,遷移做法和連結串列類似
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
  • 可以看到,擴容首先是先嚐試擴容,呼叫第一個方法tryPresize,也就是嘗試預先調整表的大小以容納給定數量的元素

  • 具體擴容的方法在末尾呼叫的第二個方法transfer

  • 擴容的基本原理:

    • 首先建立一個新的HashMap,其陣列長度是原來陣列的倆倍

    • 將old的元素逐個遷移過來,這點從引數中可以看出,tab是擴容前的HashMap,nextTab是擴容後的hashMap,當nextTab==null時,就會對nextTab進行初始化,容量為2倍

    • 上述步驟就會遇到一個問題,併發問題,如下圖:

      • 舊的的陣列長度為N,每個執行緒擴容一段,一段的長度用量用變數stride(步長)來表示,transferIndex表示整個陣列擴容的進度

      • stride(步長)的計算公式:在單核模式下直接等於0,因為單核模式下沒有辦 法多個執行緒並行擴容,只需要1個執行緒來擴容整個陣列;在多核模式下為 (n>>> 3)/NCPU,並且保證步長的最小值是 16。顯然,需要的執行緒個數約為n/stride

        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        
      • transferIndex是ConcurrentHashMap的一個成員變數,記錄了擴容的進度。初始值為n,從大到 小擴容,每次減stride個位置,最終減至n<=0,表示整個擴容完成。因此,從[0,transferIndex-1]的 位置表示還沒有分配到執行緒擴容的部分,從[transfexIndex,n-1]的位置表示已經分配給某個執行緒進行擴 容,當前正在擴容中,或者已經擴容成功。

      • 因為transferIndex會被多個執行緒併發修改,每次減stride,所以需要通過CAS進行操作,如下面的程式碼 所示

        else if (U.compareAndSwapInt
                                 (this, TRANSFERINDEX, nextIndex,
                                  nextBound = (nextIndex > stride ?
                                               nextIndex - stride : 0))) 
        
      • 在擴容未完成之前,有的陣列下標對應的槽已經遷移到了新的HashMap裡面,有的還在舊的 HashMap 裡面。這個時候,所有呼叫 get(k,v)的執行緒還是會訪問舊 HashMap,怎麼處理 呢?

      • 解決方案:當Node[0]已經遷移成功,而其他Node還在遷移過程中時, 如果有執行緒要讀取Node[0]的資料,就會訪問失敗。為此,新建一個ForwardingNode,即轉 發節點,在這個節點裡面記錄的是新的 ConcurrentHashMap 的引用。這樣,當執行緒訪問到 ForwardingNode之後,會去查詢新的ConcurrentHashMap

      • 因為陣列的長度 tab.length 是2的整數次方,每次擴容又是2倍。而 Hash 函式是 hashCode%tab.length,等價於hashCode&(tab.length-1)。這意味著:處於第i個位置的 元素,在新的Hash表的陣列中一定處於第i個或者第i+n個位置,舉個簡單的例 子:假設陣列長度是8,擴容之後是16

        • 若hashCode=5,5&8=0,擴容後,5&16=0,位置保持不變;

        • 若hashCode=24,24&8=0,擴容後,24&16=8,後移8個位置;

        • 若hashCode=25,25&8=1,擴容後,25&16=9,後移8個位置;

        • 若hashCode=39,39&8=7,擴容後,39&8=7,位置保持不變;

        • 正因為有這樣的規律,所以如下有程式碼:

          setTabAt(nextTab, i, ln);
                                      setTabAt(nextTab, i + n, hn);
                                      setTabAt(tab, i, fwd);
          
          • 也就是把tab[i]位置的連結串列或紅黑樹重新組裝成兩部分,一部分連結到nextTab[i]的位置,一部分鏈 接到nextTab[i+n]的位置。然後把tab[i]的位置指向一個ForwardingNode節點

          • 同時,當tab[i]後面是連結串列時,使用類似於JDK 7中在擴容時的優化方法,從lastRun往後的所有節 點,不需依次拷貝,而是直接連結到新的連結串列頭部。從lastRun往前的所有節點,需要依次拷貝

          • 瞭解了核心的遷移函式transfer(tab,nextTab),再回頭看tryPresize(int size)函式。這個函 數的輸入是整個Hash表的元素個數,在函式裡面,根據需要對整個Hash表進行擴容。想要看明白這個 函式,需要透徹地理解sizeCtl變數,下面這段註釋摘自原始碼

            /**
             * Table initialization and resizing control.  When negative, the
             * table is being initialized or resized: -1 for initialization,
             * else -(1 + the number of active resizing threads).  Otherwise,
             * when table is null, holds the initial table size to use upon
             * creation, or 0 for default. After initialization, holds the
             * next element count value upon which to resize the table.
             */
            
            • 當sizeCtl=-1時,表示整個HashMap正在初始化;
            • 當sizeCtl=某個其他負數時,表示多個執行緒在對HashMap做併發擴容;
            • 當sizeCtl=cap時,tab=null,表示未初始之前的初始容量(如上面的建構函式所示);
            • 擴容成功之後,sizeCtl儲存的是下一次要擴容的閾值,即上面初始化程式碼中的n-(n>>>2) =0.75n。
            • 所以,sizeCtl變數在Hash表處於不同狀態時,表達不同的含義。明白了這個道理,再來看上面的 tryPresize(int size)函式。
          • tryPresize(int size)是根據期望的元素個數對整個Hash表進行擴容,核心是呼叫transfer函式。 在第一次擴容的時候,sizeCtl會被設定成一個很大的負數U.compareAndSwapInt(this,SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT)+2);之後每一個執行緒擴容的時候,sizeCtl 就加 1, U.compareAndSwapInt(this,SIZECTL,sc,sc+1),待擴容完成之後,sizeCtl減1。