1. 程式人生 > >[技術分享]-ConcurrentHashMap在jdk1.8中的改進

[技術分享]-ConcurrentHashMap在jdk1.8中的改進

一、簡單回顧ConcurrentHashMap在jdk1.7中的設計

  • 與Hashtable不同的是,ConcurrentHashMap使用的是分段鎖技術,將ConcurrentHashMap容器的資料分段儲存,每一段資料分配一個Segment,當執行緒佔用一個Segment時,其他執行緒可以訪問其他段的資料.(每個segment都是一個鎖). 與hashtable相比,這麼設計的目的是對於put, remove等操作,可以減少併發衝突,對不屬於同一個片段的節點可以併發操作,大大提高了效能.

Segment : 可重入鎖(在JAVA環境下 ReentrantLock 和synchronized 都是 可重入鎖),繼承ReentrantLock, 也稱之為桶( 本質上Segment類就是一個小的hashmap,裡面table陣列儲存了各個節點的資料,繼承了ReentrantLock, 可以作為互斥鎖使用 ) 

每個Segment守護著一個HashEntry數組裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得它對應的Segment鎖。

HashEntry : 主要儲存鍵值對, 這裡也可以叫節點

image


HashEntry原始碼:

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

其中,volatile關鍵字保證了多執行緒讀取的時候一定是最新值。

ConcurrentHashMap包含一個Segment陣列,每個Segment包含一個HashEntry陣列,當修改HashEntry陣列,採用開鏈法處理衝突,所以它的每個HashEntry元素又是連結串列結構的元素。

ConcurrentHashMap構造方法

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;   //1
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;    //2
        }
        this.segmentShift = 32 - sshift;  //3
        this.segmentMask = ssize - 1;   //4
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);//5
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //6
        UNSAFE.putOrderedObject(ss, SBASE, s0); 
        this.segments = ss;
    }
整個初始化是通過引數initialCapacity(初始容量),loadFactor(增長因子)和concurrencyLevel(併發等級)來初始化segmentShift(段偏移量)、segmentMask(段掩碼)和segment陣列。注意以下兩點:
1: 最大的併發等級不能超過MAX_SEGMENTS 1<<16(65535),(如果你傳入的是15 就是向上取2的4次方倍 也就是16.

2:segmentShift和segmentMask在定位segment使用,segmentShift = 32 - ssize向左移位的次數,segmentMask = ssize - 1。ssize的最大長度是65536,對應的 segmentShift最大值為16,segmentMask最大值是65535,對應的二進位制16位全為1;

初始化segment: 1:初始化每個segment的HashEntry長度;2:建立segment陣列和segment[0]。
注:HashEntry長度cap同樣也是2的N次方,預設情況,ssize = 16,initialCapacity = 16,loadFactor = 0.75f,那麼cap = 1,threshold = (int) cap * loadFactor = 0。

ConcurrentHashMap Get操作

public V get(Object key) {
        Segment<K,V> s; 
        HashEntry<K,V>[] tab;
        int h = hash(key);  //1
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&  //2
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
根據key計算hash值;根據計算出的hash值定位segment 如果segment不為null segment.table也不為null 跳轉進裡面的迴圈
( 通過hash值定位segment中對應的HashEntry 遍歷HashEntry,如果key存在,返回key對應的value 如果不存在則返回null )


ConcurrentHashMap Put操作

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          
             (segments, (j << SSHIFT) + SBASE)) == null) 
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
1.判斷值是否為null
2.計算hash值
3.定位segment 如果不存在,則建立
4.呼叫segment的put方法

Segment的put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);  //1
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);  //2
                for (HashEntry<K,V> e = first;;) { //3
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
1. 獲取鎖 ,保證執行緒安全
2. 定位到具體的HashEntry
3. 遍歷HashEntry連結串列,如果key已存在 再判斷傳入的onlyIfAbsent的值 ,再決定是否覆蓋舊值.
4. 最後釋放鎖,返回舊值.

二、ConcurrentHashMap在jdk1.8中做了兩方面的改進

改進一:取消segments欄位,直接採用transient volatile HashEntry<K,V>[] table儲存資料,採用table陣列元素作為鎖,從而實現了對每一行資料進行加鎖,進一步減少併發衝突的概率。

改進二:將原先table陣列+單向連結串列的資料結構,變更為table陣列+單向連結串列+紅黑樹的結構。對於hash表來說,最核心的能力在於將key hash之後能均勻的分佈在陣列中。如果hash之後雜湊的很均勻,那麼table陣列中的每個佇列長度主要為0或者1。但實際情況並非總是如此理想,雖然ConcurrentHashMap類預設的載入因子為0.75,但是在資料量過大或者運氣不佳的情況下,還是會存在一些佇列長度過長的情況,如果還是採用單向列表方式,那麼查詢某個節點的時間複雜度為O(n);因此,對於個數超過8(預設值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度可以降低到O(logN),以此改進效能。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果table為空,初始化;否則,根據hash值計算得到陣列索引i,如果tab[i]為空,直接新建節點Node即可。注:tab[i]實質為連結串列或者紅黑樹的首節點。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   
        }
        // 如果tab[i]不為空並且hash值為MOVED,說明該連結串列正在進行transfer操作,返回擴容完成後的table。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 針對首個節點進行加鎖操作,而不是segment,進一步減少執行緒衝突
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果在連結串列中找到值為key的節點e,直接設定e.val = value即可。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 如果沒有找到值為key的節點,直接新建Node並加入連結串列即可。
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果首節點為TreeBin型別,說明為紅黑樹結構,執行putTreeVal操作。
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 如果節點數>=8,那麼轉換連結串列結構為紅黑樹結構。
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 計數增加1,有可能觸發transfer操作(擴容)。
    addCount(1L, binCount);
    return null;
}