1. 程式人生 > >ConcurrentHashMap JDK 1.6 原始碼分析

ConcurrentHashMap JDK 1.6 原始碼分析

前言

前段時間把 JDK 1.6 中的 HashMap 主要的一些操作原始碼分析了一次。既然把 HashMap 原始碼分析了, 就順便把 JDK 1.6 中 ConcurrentHashMap 的主要一些操作原始碼分析一下。因為其中有很多思想是值得我們去借鑑的。 ConcurrentHashMap 中的分段鎖。這個思想在 JDK 1.8 中 為了優化 JUC 下的原子鎖 CAS 高併發情況下導致自旋次數太多效率低下。引用 Adder 。其中就是借鑑了分段鎖的思想。AtomicLong 對比 LongAdder。 有興趣可以檢視。

準備

如果有人問你瞭解 ConcurrentHashMap 嗎? 你可以這樣回答,瞭解。 ConcurrentHashMap

是為了 取代 HashMap非執行緒安全的,一種執行緒安全實現類。它有一個 Segment 陣列,Segment 本身就是相當於一個 HashMap物件。裡面是一個 HashEntry 陣列,陣列中的每一個 HashEntry 都是一個鍵值對,也是一個連結串列的表頭。如果別人問你,那 ConcurrentHashMap get 或者 put 一個物件的時候是怎麼操作的 ,你該怎麼回答。emmm..... 繼續往下看。會有你要的答案。

建構函式

分析原始碼,先從建構函式開始。直接研究帶所有引數的構造方法,其他一些過載的構造方法,最裡面還是呼叫了該構造方法。在看構造方法之前,需要 明白 sshift 是表示併發數的2的幾次方 比如併發數是16 那麼他的值就是 4 。ssize 是 segment

陣列的大小。

 public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 用來與 key的hashCode >>> 運算 獲取HashCode的高位
        segmentShift = 32 - sshift;
        // 高位與 它做與運算 eg 假如 預設的建立該物件 那麼 segmentShift = 28  segmentMask=15(二進位制為1111) 假如現在put一個值 他的key的HashCode值為2的32次方 那麼 他在segment裡面的定位時 2的32次方 無符號 高位補零 右移28個 那麼就等於 10000(二進位制) 等於 16  與 1111 做與運算 等於0 也就是定位在 segment[0]上 。
        segmentMask = ssize - 1;


        // segment陣列大小為 16 
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        // segment陣列中 每個HashEntry陣列的大小,
        int cap = 1;
        while (cap < c)
            cap <<= 1;
        // 為segment陣列中的每個HashEntry陣列初始化大小,每個semengt中只有一個HashEntry陣列。如果你設定的 ConcurrentHashMap 初始化大小為16的話,則 segment陣列中每個的HashEntry的大小為1,如果你初始化他的大小為28 的話。它會根據上面的運算,cap的大小為2,也就是segment陣列中的每個HashEntry陣列的大小為2 ,總的大小為32。
        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }

上面的註釋應該都挺清楚了,要注意的是 ConcurrentHashMap的大小 是所有 Segment 陣列中每個HashEntry陣列的大小相加的和。

put 方法

ConcurrentHashMap 每次 put 的時候都是需要加鎖的,只不過會鎖住他所在的那個Segment陣列位置。其他的不鎖,這也就是分段鎖,預設支援16個併發。說起put,以陣列的形式儲存的資料,就會涉及到擴容。這樣是接下來需要好好討論的一個事情。

    public V put(K key, V value) {
    // key value 不能為null
        if (value == null)
            throw new NullPointerException();
        // 獲取hash值
        int hash = hash(key.hashCode());
        // 先獲取hash二進位制數的高位與15的二進位制做與運算,得到segment陣列的位置。
        return segmentFor(hash).put(key, hash, value, false);
    }

    V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 鎖住
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                // 擴容操作
                    rehash();
                    // 獲取 Segment陣列中的其中的HashEntry陣列
                HashEntry<K,V>[] tab = table;
                // 獲取在在HashEntry陣列中的位置。
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                // 判斷是否是該key。
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                // 如果存在該key的資料 ,那麼更新該值 返回舊值
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    //頭插法插入 tab[index] 
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

        // 看下擴容操作的細節
          void rehash() {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity >= MAXIMUM_CAPACITY)
                return;

            // HashEntry陣列,新的陣列為它的兩倍
            HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
            // 閾值
            threshold = (int)(newTable.length * loadFactor);
            //他的二進位制新增以為 原來他的大小為3 那麼二進位制就是11 現在就為 7 二進位制為 111
            int sizeMask = newTable.length - 1;
            for (int i = 0; i < oldCapacity ; i++) {
               
                // 舊的HashEntry。
                HashEntry<K,V> e = oldTable[i];

                if (e != null) {
                    // 下一個 該HashEntry陣列上的 HashEntry是否為連結串列,有下一個值。
                    HashEntry<K,V> next = e.next;
                    // 該HashEntry的新位置 如果高位為1 那麼他在HashEntry陣列中的位置就是老的HashEntry陣列中的加上這個倍數。舉個例子
                    // 假如e.hash 原來的的二進位制位...111 老的HashEntry陣列的大小為 4 那麼e.hash和 4-1 也就是3 做與運算 得到的值也就是二進位制的11
                    // 值位3 現在新的HashEntry陣列的大小為 8 也就是 e.hash 和 8-1 做與運算 得到的值 也就是二進位制位 111 位 7 。
                    int idx = e.hash & sizeMask;

                    //  沒有的話就直接放入該位置了,如果有的話往下看:
                    if (next == null)
                        newTable[idx] = e;

                    else {
                        HashEntry<K,V> lastRun = e;
                        // 假如idx 等於 7
                        int lastIdx = idx;
                        // 找到最後一個 HashEntry中的位置,並且後面的HashEntry的位置都是一樣的。舉個例子
                        // 假如這個連結串列中的所有HashEntry的Hash值為 1-5-1-5-5-5 。那麼最後lastIdx = 5 也就是1-5-1後面的這個5 。lastRun 為 1-5-1後面的這個5的HashEnrty。
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                //     
                                lastRun = last;
                            }
                        }
                        // 將 lastRun 複製給 這個新的Table 那麼後面還有 5-5-5 這些的就不用移動了 直接帶過來了。 這就是上面那個for迴圈做的事情
                        newTable[lastIdx] = lastRun;

                        // 對前面的 1-5-1做操作了 1就是在新HashEntry書中的1的位置 5的後就是頭插法 ,查到新HashEntry的頭部了
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            int k = p.hash & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(p.key, p.hash,
                                                             n, p.value);
                        }
                    }
                }
            }
            table = newTable;
        }

其實put 方法中有點難理解的就是 把查詢到後面如果有所有相同的 HashEntrykey的位置是一樣的話,就不用額外的進行Hash重新定位了。不知道我描述的清不清楚。如果還有不清楚的話,可以私信一下我。

get 方法

ConcurrentHashMapget 方法是不會加鎖的,如果get的值為null的時候,這個時候會對這個HashEntry進行加鎖。預防此時併發出現的問題。

    public V get(Object key) {
        //定位Segment陣列中的HashEntry陣列為位置
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
    }

     V get(Object key, int hash) {
            // 曾經put進去過,也就是裡面有值
            if (count != 0) { // read-volatile
                // 定位HashEntry陣列中的HashEntry。
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }

ConcurrentHashMapget方法是比較簡單的。看一看就知道了。

總結

這一遍ConcurrentHashMap原始碼分析,可以說是自己寫了大半個月吧。好早之前就準備寫了。總是寫一點,然後就停筆了。加上自己換了公司的原因,又忙上了一段時間,導致一拖再落。哇,嚴重拖延症患者。上面自己也是全部透徹之後寫下來的,如果有些表達不夠清晰的還得多加包涵,如果有不同的可以下方瀏覽討論一下。上面很多關鍵的程式碼我都寫上了註釋,可以配合著註釋,然後自己對原始碼進行研究,檢視,如果還有不是很透徹的話,自己多翻一翻其他人寫的。最近一直在寫LeetCode上的動態規劃這些演算法題。其實也就是抄一遍。等以後有了感悟再來寫這一些吧