1. 程式人生 > 其它 >併發程式設計-ConcurrentHashMap(二)

併發程式設計-ConcurrentHashMap(二)

併發程式設計-ConcurrentHashMap(二)

昨天說到擴容前面的準備工作,和一系列的判斷,其中我覺得設計精妙的就是他的那個【高低位擴容】,精巧的使用了二進位制,從某種層面講,提升了效能,因為二進位制的那個變數的儲存,就相同於一個容器,如果不使用它,那肯定要new出一個容器進行儲存,這就會佔用記憶體。今天繼續分析,所有關於CHM的東西,今天咱們就會剖析完,let's start with the method named transfer.

transfer()

這裡主要是對資料進行轉移

  • 需要計算當前執行緒的資料遷移空間
  • 建立一個新的陣列,容量為擴容後的大小
  • 實現資料的轉移
    •   如果是紅黑樹
      •   如果資料遷移後,不滿足紅黑樹的條件,則紅黑樹轉化成連結串列
    •   如果是連結串列
      •   相應的閾值轉換成紅黑樹
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 這裡是計算每個執行緒處理資料的區間大小,最小是16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    
//擴容之後的陣列(在原來的陣列的容量的基礎上擴大了一倍) if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //這是轉移的索引,每個執行緒所處理的區間數量 transferIndex = n; } int nextn = nextTab.length; //這個表示已經遷移完成的狀態(如果老陣列中的的節點完成了遷移,則需要修改成fwd) ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //通過迴圈對區間進行計算 假設陣列長度是32 //那第一次計算的區間就是【16(nextBound),31(i)】 第二次計算就是【0,15】 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, (nextBound) = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //判斷是否擴容結束 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //因為前面在提到高低位擴容的時候是預設給低位加2的,所以現在減2如果等於初始資料則證明擴容結束 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; } } //得到陣列最高位的值,如果當前陣列位置為空,則直接修改成fwd表示陣列遷移完成 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //判斷這個節點是否已經被處理過了,如果是,則進入下一次區間遍歷 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //針對當前要去遷移的節點加鎖(陣列最大位的節點的位置),其他執行緒呼叫時候,需要等待 synchronized (f) { //下面就是針對不同型別的節點【連結串列/紅黑樹】,做不同的處理了,那這裡我們會遇見一個問題,就是我們的內容存貨從的下標是通過key和老陣列的長度計算出來的,那新的陣列可能會對應不同的hash數值,所以下面有一個變數【runBit】判斷是否我們遷移某些資料或者不遷移 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; //遍歷當前列表,進行計算(組成兩個鏈路)-找到最早的runBit不產生變化的那個資料(這樣就證明在後續的資料中我都不需要進行遷移),那就把這個資料後面的組成一條鏈路(ln),這個鏈路上的剩餘資料就是需要進行遷移的(因為他們的hash和新陣列的不同)所以剩下的資料就組成一條鏈路(hn) for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //表示當前位置不用變化 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }

如果進行元素個數的計算

因為它是一個併發的集合框架,那多執行緒情況下,他是如何保證計算元素個數的準確性呢,這裡面他使用了兩種方法結合的方式,一個是basecount計算總數的變數 另外一種就是名為CounterCell的陣列。

整體流程如下:

  • 每次增加資料的時候對basecount進行增加,如果失敗(那就證明有多個執行緒正在對這個資源共同搶佔)
  • 那就隨機給CounterCell陣列中儲存一個數據,這就削減了basecount的壓力
  • 最後對basecount和CounterCell的資料進行一個累加,從而達到計算總數的效果,這裡都是使用cas保障安全性的
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //統計元素個數 如果使用BASECOUNT沒有修改成功
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            //這裡就是隨便找一個或者counterCells中的元素進行累加
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //這裡完成元素的累加
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    //是否要進行擴容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

對元素進行累加

// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    //cellsBusy是一個修改資料時保持原子性的標記
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {     
                            // Recheck under lock
                            //將初始化的r物件的元素個數放在對應下標的位置    
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            // 擴容部分 同樣通過cas去獲得鎖 
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];//把countercell的大小擴大一倍,然後遍歷陣列,把資料新增到新的陣列中
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        //如果countercell為空 通過CAS(compareAndSwapInt)操作保障執行緒安全性
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    //初始化一個長度為2的陣列
                    CounterCell[] rs = new CounterCell[2];
                    //把x(元素的個數)儲存在某個位置
                    rs[h & 1] = new CounterCell(x);
                    //賦值給全域性變數counterCells
                    counterCells = rs;
                    init = true;
                }
            } finally {
                //釋放鎖
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        //當上面的操作都失敗的,那就去修改basecount,因為所有執行緒都去玩counterCells,那basecount就空閒了
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

連結串列轉換成紅黑樹(這裡牽扯到紅黑樹的知識,會在後續的博文中和大家專門聊)

static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    //保留搶到鎖的執行緒
    volatile Thread waiter;
    volatile int lockState;
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock

    static int tieBreakOrder(Object a, Object b) {
        int d;
        if (a == null || b == null ||
            (d = a.getClass().getName().
             compareTo(b.getClass().getName())) == 0)
            d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
                 -1 : 1);
        return d;
    }

    //把連結串列轉換成紅黑樹
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        this.first = b;
        TreeNode<K,V> r = null;
        //初始化紅黑樹 
        for (TreeNode<K,V> x = b, next; x != null; x = next) {
            next = (TreeNode<K,V>)x.next;
            x.left = x.right = null;
            if (r == null) {
                x.parent = null;
                x.red = false;
                r = x;
            }
            //進行新增  這裡我會出一期關於紅黑樹的博文,之後再聊
            else {
                K k = x.key;
                int h = x.hash;
                Class<?> kc = null;
                for (TreeNode<K,V> p = r;;) {
                    int dir, ph;
                    K pk = p.key;
                    if ((ph = p.hash) > h)
                        dir = -1;
                    else if (ph < h)
                        dir = 1;
                    else if ((kc == null &&
                              (kc = comparableClassFor(k)) == null) ||
                             (dir = compareComparables(kc, k, pk)) == 0)
                        dir = tieBreakOrder(k, pk);
                        TreeNode<K,V> xp = p;
                    if ((p = (dir <= 0) ? p.left : p.right) == null) {
                        x.parent = xp;
                        if (dir <= 0)
                            xp.left = x;
                        else
                            xp.right = x;
                        r = balanceInsertion(r, x);
                        break;
                    }
                }
            }
        }
        this.root = r;
        assert checkInvariants(root);
    }

總結(這兩篇聊過的東西)

使用:包含了一些java8的新方法

原理分析:put方法內元素新增,構建陣列

解決hash衝突:使用了鏈式定址法

擴容:資料遷移,多執行緒併發協助遷移,高低位遷移(需要遷移的資料放在高位,不需要遷移的放在低位,然後一次性把這些放在新的陣列中)

元素的統計:使用陣列和basecounter使用分片的思想進行統計

當連結串列長度大於等於8,,並且陣列長度大於等於64的時候,連結串列轉換成紅黑樹