1. 程式人生 > 其它 >併發程式設計-ConcurrentHashMap(一)(剖析CHM原始碼、設計思想、資料結構)

併發程式設計-ConcurrentHashMap(一)(剖析CHM原始碼、設計思想、資料結構)

併發程式設計-ConcurrentHashMap(一)(剖析CHM原始碼、設計思想、資料結構)

本篇來聊聊1.8的ConcurrentHashMap(CHS),關於它的一些設計思想(高低位擴容、分段鎖、紅黑樹、連結串列 so on),資料結構,和原始碼試實現行剖析,本篇會講到前面的一部分程式碼分析,包括(延遲初始化、閾值判斷擴容、以及高低位擴容)

為什麼使用CHS

HashMap】是非執行緒安全的,有時候因為沒有給使用【HashMap】的程式碼段加同步鎖,在1.7的時候居然導致了死鎖,。那我們就使用【Hashtable】,他是可以保證執行緒安全,但是他只是粗暴的加上了synchronized關鍵字,這樣鎖的粒度太粗

(效能很低) 。所以當多執行緒場景下CHS就應運而生

CHS的使用

平常的新增,刪除等一些操作這裡我們不再繼續瞭解,咱們瞭解一下在1.8後加的一些新的方法【computeIfAbsent】【computeIfPresent】【computeIfPresent【compute】【merge】我這裡分別寫了個例子,所以不再贅述

CHS儲存和實現

總體來說,他是基於一個數組進行資料的儲存,陣列中的每個元素都叫做一個節點,並且分為兩種方式,

連結串列】:是用來解決雜湊衝突的,我們在說threadlocal的時候說到過,用的是線性探索進行解決的

紅黑樹】:用來解決連結串列過於長,帶來的時間複雜度增加的問題

,紅黑樹使得時間複雜度從【O(n)->O(logn)】

!!!node長度超過64的時候,連結串列長度超過8的時候,則進行紅黑樹的轉換,當做擴容的時候,紅黑樹會轉換成節點,因為當擴容的時候,資料會進行拉伸,伴隨著,資料結構無法達到紅黑樹的條件,從而轉變

總體視角:

  • 當我們put一個數據的時候,首先會用你put的key的雜湊值計算一個數組下標
  • 如果我們put key的雜湊數值一樣的話,則加入此節點的下面,就變成了一個單項列表。

CHS原始碼分析

我們發現,在new一個chs的時候沒並沒有建立一個新的物件,建立物件和賦值的操作都在put方法中實現的,這就是【延遲初始化】

final
V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //使用你傳進來的key去計算一個hash值,後面要用這個計算下標 int hash = spread(key.hashCode()); int binCount = 0; //這裡是個自旋,因為是多執行緒裡面肯定牽扯到cas的操作,那cas可能會失敗,失敗之後肯定要重新把這個數值傳進來,所以使用自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果table為空則去初始化一個Node<K,V>[] table 這是儲存的容器; if (tab == null || (n = tab.length) == 0) tab = initTable(); // (n - 1) & hash這裡用換算出來的hash值去計算一個下標 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果當前的算出來的下標處沒有資料則建立一個node並且儲存資料,這裡還是通過cas保障資料安全 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //當前位置如果儲存資料,則走這裡的邏輯 else { V oldVal = null; //鎖住當前的node節點避免執行緒安全問題,這裡的鎖的粒度很細,因為只是鎖住當前的node,意味著其他的節點,都可以在這個時候同時插入資料,這樣就提升了效能 synchronized (f) { if (tabAt(tab, i) == f) { //這裡是針對連結串列的操作 if (fh >= 0) { binCount = 1; //f代表的是當前node的頭結點,拿到頭節點開始向下遍歷,binCount實際上是在統計連結串列的長度 for (Node<K,V> e = f;; ++binCount) { K ek; //判斷是否存在相同的key,如果存在則進行覆蓋(相同的key後者會覆蓋前者) if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //如果不存在則把當前的key和value新增到連結串列中,這裡使用的是尾插法( = e.next),直接新增到尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //這裡是針對紅黑樹的操作 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果連結串列長度大於等於8,則會根據閾值判斷是轉化為紅黑樹還是擴容 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }

initTable(這是進行容器初始化的操作,因為是多執行緒情況下,所以要加鎖,這是使用CAS充當了鎖)這裡的【sizeCtl】等於是一個狀態機,去標識當前陣列擴容的狀態

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //只要容器沒有初始化就不斷進行初始化
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); 
        //因為是多執行緒,所以這裡使用cas保證原子性
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    //這裡的預設長度是16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //這裡保留擴容的閾值(當陣列長度到達擴容的閾值的時候,就進行擴容)
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

treeifyBin(這裡根據閾值判斷,是轉換紅黑樹,還是擴容,前面說了陣列長度到達64的時候才會轉換為紅黑樹

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //如果table長度小於64則進行擴容,否則進行紅黑樹的轉換
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            //進行擴容
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            //進行紅黑樹的轉換
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

tryPresize(對陣列進行擴容):這裡牽扯到一個【多執行緒併發擴容】,簡而言之,就是允許多個執行緒對陣列協助擴容,

擴容的本質:

  • 每次把陣列的長度擴大到原來的一倍
  • 然後把老的資料遷移到新的陣列
private final void tryPresize(int size) {
    //用於判斷擴容的目標大小
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);//這裡就是為了滿足2m³,如果是15那就給你轉化成16(轉換成最近的2m³)
    int sc;
    //說明要做陣列的初始化,因為這個方法在別的地方也有使用,所以要進行判斷
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        //這裡進行陣列的初始化
        if (tab == null || (n = tab.length) == 0) {
            //這裡陣列容量是通過對比的計算出來的,初始容量(我們可以自定義chm的大小)和擴容容量誰的數字大就選擇誰作為陣列的預設長度
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        //已經是最大的容量,則不進行擴容了,直接返回
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            //這裡生成一個擴容戳這是為了保證當前擴容範圍的唯一性(使用多執行緒來執行分段擴容),我們在下面聊聊這個東西,他挺有意思的
int rs = resizeStamp(n); //第一次擴容的時候,不會走這一段邏輯,因為sc不小於1 if (sc < 0) { Node<K,V>[] nt; //表示擴容結束 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //表示沒有結束,所以每次增加一個執行緒進行擴容,則在【低位】加一,當擴容結束後低位就開始遞減,證明執行緒一個個開始釋放,最終低位全部都是0,則說明擴容完畢 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
            //這裡實現陣列的資料轉移 transfer(tab, nt); }
        //第一次走這個邏輯,他會把我們上面算出的rs左移16位 就會變成(換算的結果看下面的resizeStamp分析)-> 1000 0000 0001 1011 0000 0000 0000 0000
        //算出來的戳的 【高16位】標識當前的額擴容標記,【低16位】標識當前擴容的執行緒數量
        //然後對換算出來的結果加2 (2的二進位制是10)
        //於是就變成了 1000 000 0001 1011 0000 0000 0000 00
10 (上面講高位標識擴容標記,低位表示擴容的執行緒數量,那這裡就是一個執行緒在參加擴容)
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }

因為是多執行緒共同參與資料的遷移,那我肯定要有一個地方記錄參與這個任務的執行緒數量

【numberOfLeadingZeros】:這個方法會【返回這個資料的二進位制串中從最左邊算起連續的“0”的總數量】,

      • 例如 一個int型別的長度是32 【0000 0000 0000 0000 0000 0000 0000 0000】 現在在17位這裡有一個1,那就在前面有16個0這個時候這個方法就會返回16 【0000 0000 0000 0000 1000 0000 0000 0000】                                    
static final int resizeStamp(int n) {
  //把前面計算出來的二進位制的第16位變成1,比如說現在key計算出來的是16 那轉換出來的二進位制就是10000 因為int是32位所以給他前面補充0就變成了
  //0000 0000 0000 0000 0000 0000 0001 0000 ->那他最高位數前面的數字就有27個0就返回27
  // 27換算出來的二進位制是 【
11011】 給前面補充0之後就變成了 0000 0000 0000 0000 0000 0000 000 1 1011
 
// 給他的第16位變成1那就變成->0000 0000 0000 0000 1000 0000 0001 1011
  return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}