1. 程式人生 > 其它 >java併發包研究之-ConcurrentHashMap

java併發包研究之-ConcurrentHashMap

HashMap是非執行緒安全的,HashTable是執行緒安全的。

那個時候沒怎麼寫Java程式碼,所以根本就沒有聽說過ConcurrentHashMap,只知道面試的時候就記住這句話就行了…至於為什麼是執行緒安全的,內部怎麼實現的,通通不瞭解。

今天我們將深入剖析一個比HashTable效能更優的執行緒安全的Map類,它就是ConcurrentHashMap,本文基於Java 7的原始碼做剖析

ConcurrentHashMap的目的

多執行緒環境下,使用Hashmap進行put操作會引起死迴圈,導致CPU利用率接近100%,所以在併發情況下不能使用HashMap。雖然已經有一個執行緒安全的HashTable,但是HashTable容器使用synchronized(他的get和put方法的實現程式碼如下)來保證執行緒安全,線上程競爭激烈的情況下HashTable的效率非常低下。因為當一個執行緒訪問HashTable的同步方法時,訪問其他同步方法的執行緒就可能會進入阻塞或者輪訓狀態。如執行緒1使用put進行新增元素,執行緒2不但不能使用put方法新增元素,並且也不能使用get方法來獲取元素,所以競爭越激烈效率越低。

 /**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

在這麼惡劣的環境下,ConcurrentHashMap應運而生。

實現原理

ConcurrentHashMap使用分段鎖技術,將資料分成一段一段的儲存,然後給每一段資料配一把鎖,當一個執行緒佔用鎖訪問其中一個段資料的時候,其他段的資料也能被其他執行緒訪問,能夠實現真正的併發訪問。如下圖是ConcurrentHashMap的內部結構圖:

從圖中可以看到,ConcurrentHashMap內部分為很多個Segment,每一個Segment擁有一把鎖,然後每個Segment(繼承ReentrantLock)下面包含很多個HashEntry列表陣列。對於一個key,需要經過三次(為什麼要hash三次下文會詳細講解)hash操作,才能最終定位這個元素的位置,這三次hash分別為:

  1. 對於一個key,先進行一次hash操作,得到hash值h1,也即h1 = hash1(key);
  2. 將得到的h1的高几位進行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通過h2能夠確定該元素的放在哪個Segment;
  3. 將得到的h1進行第三次hash,得到hash值h3,也即h3 = hash3(h1),通過h3能夠確定該元素放置在哪個HashEntry。

鎖分段技術

HashTable容器在競爭激烈的併發環境下表現出效率低下的原因是所有訪問HashTable的執行緒都必須競爭同一把鎖,那假如容器裡有多把鎖,每一把鎖用於鎖容器其中一部分資料,那麼當多執行緒訪問容器裡不同資料段的資料時,執行緒間就不會存在鎖競爭,從而可以有效的提高併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將資料分成一段一段的儲存,然後給每一段資料配一把鎖,當一個執行緒佔用鎖訪問其中一個段資料的時候,其他段的資料也能被其他執行緒訪問。

ConcurrentHashMap的結構

我們通過ConcurrentHashMap的類圖來分析ConcurrentHashMap的結構。

ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap裡扮演鎖的角色,HashEntry則用於儲存鍵值對資料。一個ConcurrentHashMap裡包含一個Segment陣列,Segment的結構和HashMap類似,是一種陣列和連結串列結構, 一個Segment裡包含一個HashEntry陣列,每個HashEntry是一個連結串列結構的元素, 每個Segment守護者一個HashEntry數組裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得它對應的Segment鎖。

初始化

先看看ConcurrentHashMap的初始化做了哪些事情,建構函式的原始碼如下:

  /**
     * Creates a new, empty map with the specified initial
     * capacity, load factor and concurrency level.
     *
     * @param initialCapacity the initial capacity. The implementation
     * performs internal sizing to accommodate this many elements.
     * @param loadFactor  the load factor threshold, used to control resizing.
     * Resizing may be performed when the average number of elements per
     * bin exceeds this threshold.
     * @param concurrencyLevel the estimated number of concurrently
     * updating threads. The implementation performs internal sizing
     * to try to accommodate this many threads.
     * @throws IllegalArgumentException if the initial capacity is
     * negative or the load factor or concurrencyLevel are
     * nonpositive.
     */
    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

傳入的引數有initialCapacity,loadFactor,concurrencyLevel這三個。

  • initialCapacity表示新建立的這個ConcurrentHashMap的初始容量,也就是上面的結構圖中的Entry數量。預設值為static final int DEFAULT_INITIAL_CAPACITY = 16;
  • loadFactor表示負載因子,就是當ConcurrentHashMap中的元素個數大於loadFactor * 最大容量時就需要rehash,擴容。預設值為static final float DEFAULT_LOAD_FACTOR = 0.75f;
  • concurrencyLevel表示併發級別,這個值用來確定Segment的個數,Segment的個數是大於等於concurrencyLevel的第一個2的n次方的數。比如,如果concurrencyLevel為12,13,14,15,16這些數,則Segment的數目為16(2的4次方)。預設值為static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想情況下ConcurrentHashMap的真正的併發訪問量能夠達到concurrencyLevel,因為有concurrencyLevel個Segment,假如有concurrencyLevel個執行緒需要訪問Map,並且需要訪問的資料都恰好分別落在不同的Segment中,則這些執行緒能夠無競爭地自由訪問(因為他們不需要競爭同一把鎖),達到同時訪問的效果。這也是為什麼這個引數起名為“併發級別”的原因。

初始化的一些動作:

  1. 驗證引數的合法性,如果不合法,直接丟擲異常。
  2. concurrencyLevel也就是Segment的個數不能超過規定的最大Segment的個數,預設值為static final int MAX_SEGMENTS = 1 << 16;,如果超過這個值,設定為這個值。
  3. 然後使用迴圈找到大於等於concurrencyLevel的第一個2的n次方的數ssize,這個數就是Segment陣列的大小,並記錄一共向左按位移動的次數sshift,並令segmentShift = 32 - sshift,並且segmentMask的值等於ssize - 1,segmentMask的各個二進位制位都為1,目的是之後可以通過key的hash值與這個值做&運算確定Segment的索引。
  4. 檢查給的容量值是否大於允許的最大容量值,如果大於該值,設定為該值。最大容量值為static final int MAXIMUM_CAPACITY = 1 << 30;
  5. 然後計算每個Segment平均應該放置多少個元素,這個值c是向上取整的值。比如初始容量為15,Segment個數為4,則每個Segment平均需要放置4個元素。
  6. 最後建立一個Segment例項,將其當做Segment陣列的第一個元素。

put操作

put操作的原始碼如下:

  /**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * <p> The value can be retrieved by calling the <tt>get</tt> method
     * with a key that is equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with <tt>key</tt>, or
     *         <tt>null</tt> if there was no mapping for <tt>key</tt>
     * @throws NullPointerException if the specified key or value is null
     */
    @SuppressWarnings("unchecked")
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

操作步驟如下:

  1. 判斷value是否為null,如果為null,直接丟擲異常。
  2. key通過一次hash運算得到一個hash值。(這個hash運算下文詳說)
  3. 將得到hash值向右按位移動segmentShift位,然後再與segmentMask做&運算得到segment的索引j。
    在初始化的時候我們說過segmentShift的值等於32-sshift,例如concurrencyLevel等於16,則sshift等於4,則segmentShift為28。hash值是一個32位的整數,將其向右移動28位就變成這個樣子:
    0000 0000 0000 0000 0000 0000 0000 xxxx,然後再用這個值與segmentMask做&運算,也就是取最後四位的值。這個值確定Segment的索引。
  4. 使用Unsafe的方式從Segment陣列中獲取該索引對應的Segment物件。
  5. 向這個Segment物件中put值,這個put操作也基本是一樣的步驟(通過&運算獲取HashEntry的索引,然後set)。

get操作

get操作的原始碼如下:

 /**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

操作步驟為:

  1. 和put操作一樣,先通過key進行兩次hash確定應該去哪個Segment中取資料。
  2. 使用Unsafe獲取對應的Segment,然後再進行一次&運算得到HashEntry連結串列的位置,然後從連結串列頭開始遍歷整個連結串列(因為Hash可能會有碰撞,所以用一個連結串列儲存),如果找到對應的key,則返回對應的value值,如果連結串列遍歷完都沒有找到對應的key,則說明Map中不包含該key,返回null。

size操作

size操作與put和get操作最大的區別在於,size操作需要遍歷所有的Segment才能算出整個Map的大小,而put和get都只關心一個Segment。假設我們當前遍歷的Segment為SA,那麼在遍歷SA過程中其他的Segment比如SB可能會被修改,於是這一次運算出來的size值可能並不是Map當前的真正大小。所以一個比較簡單的辦法就是計算Map大小的時候所有的Segment都Lock住,不能更新(包含put,remove等等)資料,計算完之後再Unlock。這是普通人能夠想到的方案,但是牛逼的作者還有一個更好的Idea:先給3次機會,不lock所有的Segment,遍歷所有Segment,累加各個Segment的大小得到整個Map的大小,如果某相鄰的兩次計算獲取的所有Segment的更新的次數(每個Segment都有一個modCount變數,這個變數在Segment中的Entry被修改時會加一,通過這個值可以得到每個Segment的更新操作的次數)是一樣的,說明計算過程中沒有更新操作,則直接返回這個值。如果這三次不加鎖的計算過程中Map的更新次數有變化,則之後的計算先對所有的Segment加鎖,再遍歷所有Segment計算Map大小,最後再解鎖所有Segment。原始碼如下:

    /**
     * Returns the number of key-value mappings in this map.  If the
     * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
     * <tt>Integer.MAX_VALUE</tt>.
     *
     * @return the number of key-value mappings in this map
     */
    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

舉個例子:

一個Map有4個Segment,標記為S1,S2,S3,S4,現在我們要獲取Map的size。計算過程是這樣的:第一次計算,不對S1,S2,S3,S4加鎖,遍歷所有的Segment,假設每個Segment的大小分別為1,2,3,4,更新操作次數分別為:2,2,3,1,則這次計算可以得到Map的總大小為1+2+3+4=10,總共更新操作次數為2+2+3+1=8;第二次計算,不對S1,S2,S3,S4加鎖,遍歷所有Segment,假設這次每個Segment的大小變成了2,2,3,4,更新次數分別為3,2,3,1,因為兩次計算得到的Map更新次數不一致(第一次是8,第二次是9)則可以斷定這段時間Map資料被更新,則此時應該再試一次;第三次計算,不對S1,S2,S3,S4加鎖,遍歷所有Segment,假設每個Segment的更新操作次數還是為3,2,3,1,則因為第二次計算和第三次計算得到的Map的更新操作的次數是一致的,就能說明第二次計算和第三次計算這段時間內Map資料沒有被更新,此時可以直接返回第三次計算得到的Map的大小。最壞的情況:第三次計算得到的資料更新次數和第二次也不一樣,則只能先對所有Segment加鎖再計算最後解鎖。

containsValue操作

containsValue操作採用了和size操作一樣的想法:

 /**
     * Returns <tt>true</tt> if this map maps one or more keys to the
     * specified value. Note: This method requires a full internal
     * traversal of the hash table, and so is much slower than
     * method <tt>containsKey</tt>.
     *
     * @param value value whose presence in this map is to be tested
     * @return <tt>true</tt> if this map maps one or more keys to the
     *         specified value
     * @throws NullPointerException if the specified value is null
     */
    public boolean containsValue(Object value) {
        // Same idea as size()
        if (value == null)
            throw new NullPointerException();
        final Segment<K,V>[] segments = this.segments;
        boolean found = false;
        long last = 0;
        int retries = -1;
        try {
            outer: for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                long hashSum = 0L;
                int sum = 0;
                for (int j = 0; j < segments.length; ++j) {
                    HashEntry<K,V>[] tab;
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null && (tab = seg.table) != null) {
                        for (int i = 0 ; i < tab.length; i++) {
                            HashEntry<K,V> e;
                            for (e = entryAt(tab, i); e != null; e = e.next) {
                                V v = e.value;
                                if (v != null && value.equals(v)) {
                                    found = true;
                                    break outer;
                                }
                            }
                        }
                        sum += seg.modCount;
                    }
                }
                if (retries > 0 && sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return found;
    }

關於hash

大家一定還記得使用一個key定位Segment之前進行過一次hash操作吧?這次hash的作用是什麼呢?看看hash的原始碼:

 /**
     * Applies a supplemental hash function to a given hashCode, which
     * defends against poor quality hash functions.  This is critical
     * because ConcurrentHashMap uses power-of-two length hash tables,
     * that otherwise encounter collisions for hashCodes that do not
     * differ in lower or upper bits.
     */
    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

原始碼中的註釋是這樣的:

Applies a supplemental hash function to a given hashCode, which defends against poor quality hash functions. This is critical because ConcurrentHashMap uses power-of-two length hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits.

這裡用到了Wang/Jenkins hash演算法的變種,主要的目的是為了減少雜湊衝突,使元素能夠均勻的分佈在不同的Segment上,從而提高容器的存取效率。假如雜湊的質量差到極點,那麼所有的元素都在一個Segment中,不僅存取元素緩慢,分段鎖也會失去意義。

舉個簡單的例子:

         System.out.println(Integer.parseInt("0001111", 2) & 15);//0001111
         System.out.println(Integer.parseInt("0011111", 2) & 15);//0001111
         System.out.println(Integer.parseInt("0111111", 2) & 15);//0001111
         System.out.println(Integer.parseInt("1111111", 2) & 15);//0001111

這些數字得到的hash值都是一樣的,全是15,所以如果不進行第一次預hash,發生衝突的機率還是很大的,但是如果我們先把上例中的二進位制數字使用hash()函式先進行一次預hash,得到的結果是這樣的:

0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010

上面這個例子引用自:InfoQ
可以看到每一位的資料都散開了,並且ConcurrentHashMap中是使用預hash值的高位參與運算的。比如之前說的先將hash值向右按位移動28位,再與15做&運算,得到的結果都別為:4,15,7,8,沒有衝突!

注意事項

  • ConcurrentHashMap中的key和value值都不能為null。
  • ConcurrentHashMap的整個操作過程中大量使用了Unsafe類來獲取Segment/HashEntry,這裡Unsafe的主要作用是提供原子操作。Unsafe這個類比較恐怖,破壞力極強,一般場景不建議使用,如果有興趣可以到這裡做詳細的瞭解Java中鮮為人知的特性
  • ConcurrentHashMap是執行緒安全的類並不能保證使用了ConcurrentHashMap的操作都是執行緒安全的!

宣告

http://qifuguang.me/2015/09/10/[Java併發包學習八]深度剖析ConcurrentHashMap/

http://www.infoq.com/cn/articles/ConcurrentHashMap/ -- 參考info