1. 程式人生 > >【搞定Java8新特性】之Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

【搞定Java8新特性】之Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

本文轉載自:https://blog.csdn.net/a724888/article/details/68936953

本文目錄:

1、Java7 中的HashMap

1.1、put過程分析

陣列初始化

計算具體陣列位置

新增節點到連結串列中

陣列擴容

補充:HashMap擴容機制: JDK1.7與1.8的區別

1.2、get過程分析

getEntry(key):

2、Java7 ConcurrentHashMap

初始化

2.1、put 過程分析

初始化槽: ensureSegment

獲取寫入鎖: scanAndLockForPut

擴容: rehash

2.2、get 過程分析

2.3、size()操作

 不同之處

併發問題分析

3、Java8 HashMap

3.1、put 過程分析

陣列擴容

3.2、get 過程分析

4、Java8 ConcurrentHashMap

初始化

4.1、put 過程分析

初始化陣列:initTable

連結串列轉紅黑樹: treeifyBin

擴容:tryPresize

資料遷移:transfer

4.2、get 過程分析


在Java併發程式設計的系列文章中,詳細的講解了基於Java7的ConcurrentHashMap(點選檢視),但是Java8對ConcurrentHashMap做了較大的改動,因此本文用來講解四個模組:

1、Java7 中的HashMap;

2、Java7 中的ConcurrentHashMap;

3、Java8 中的HashMap;

4、Java8 中的ConcurrentHashMap。

閱讀前提:本文分析的是原始碼,所以至少讀者要熟悉它們的介面使用,同時,對於併發,讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個基本的知識,文中不會對這些知識進行介紹。Java8 用到了紅黑樹,不過本文不會進行展開,感興趣的讀者請自行查詢相關資料。 

1、Java7 中的HashMap

HashMap 是最簡單的,一來我們非常熟悉,二來就是它不支援併發操作,所以原始碼也非常簡單。

首先,我們用下面這張圖來介紹 HashMap 的結構。

這個僅僅是示意圖,因為沒有考慮到陣列要擴容的情況,具體的後面再說。

大方向上,HashMap 裡面是一個陣列,然後陣列中每個元素是一個單向連結串列

上圖中,每個綠色的實體是巢狀類 Entry 的例項,Entry 包含四個屬性:key, value, hash 值和用於單向連結串列的 next。

capacity:當前陣列容量,始終保持 2^n,可以擴容,擴容後陣列大小為當前的 2 倍。

loadFactor:負載因子,預設為 0.75。

threshold:擴容的閾值,等於 capacity * loadFactor

1.1、put過程分析

還是比較簡單的,跟著程式碼走一遍吧。

public V put(K key, V value) {
    // 當插入第一個元素的時候,需要先初始化陣列大小
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    // 如果 key 為 null,感興趣的可以往裡看,最終會將這個 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
    // 1. 求 key 的 hash 值
    int hash = hash(key);
    // 2. 找到對應的陣列下標
    int i = indexFor(hash, table.length);
    // 3. 遍歷一下對應下標處的連結串列,看是否有重複的 key 已經存在,
    //    如果有,直接覆蓋,put 方法返回舊值就結束了
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }

    modCount++;
    // 4. 不存在重複的 key,將此 entry 新增到連結串列中,細節後面說
    addEntry(hash, key, value, i);
    return null;
}
  • 陣列初始化

在第一個元素插入 HashMap 的時候做一次陣列的初始化,就是先確定初始的陣列大小,並計算陣列擴容的閾值。

private void inflateTable(int toSize) {
    // 保證陣列大小一定是 2 的 n 次方。
    // 比如這樣初始化:new HashMap(20),那麼處理成初始陣列大小是 32
    int capacity = roundUpToPowerOf2(toSize);
    // 計算擴容閾值:capacity * loadFactor
    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
    // 算是初始化陣列吧
    table = new Entry[capacity];
    initHashSeedAsNeeded(capacity); //ignore
}

這裡有一個將陣列大小保持為 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應的要求,只不過實現的程式碼稍微有些不同,後面再看到的時候就知道了。

  • 計算具體陣列位置

這個簡單,我們自己也能 YY 一個:使用 key 的 hash 值對陣列長度進行取模就可以了。

static int indexFor(int hash, int length) {
    // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
    return hash & (length-1);
}

這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在陣列長度為 32 的時候,其實取的就是 key 的 hash 值的低 5 位,作為它在陣列中的下標位置。

  • 新增節點到連結串列中

找到陣列下標後,會先進行 key 判重,如果沒有重複,就準備將新值放入到連結串列的表頭

void addEntry(int hash, K key, V value, int bucketIndex) {
    // 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的陣列位置已經有元素了,那麼要擴容
    if ((size >= threshold) && (null != table[bucketIndex])) {
        // 擴容,後面會介紹一下
        resize(2 * table.length);
        // 擴容以後,重新計算 hash 值
        hash = (null != key) ? hash(key) : 0;
        // 重新計算擴容後的新的下標
        bucketIndex = indexFor(hash, table.length);
    }
    // 往下看
    createEntry(hash, key, value, bucketIndex);
}
// 這個很簡單,其實就是將新值放到連結串列的表頭,然後 size++
void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

這個方法的主要邏輯就是先判斷是否需要擴容,需要的話先擴容,然後再將這個新的資料插入到擴容後的陣列的相應位置處的連結串列的表頭。

  • 陣列擴容

前面我們看到,在插入新值的時候,如果當前的 size 已經達到了閾值,並且要插入的陣列位置上已經有元素,那麼就會觸發擴容,擴容後,陣列大小為原來的 2 倍。

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }
    // 新的陣列
    Entry[] newTable = new Entry[newCapacity];
    // 將原來陣列中的值遷移到新的更大的陣列中
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

擴容就是用一個新的大陣列替換原來的小陣列,並將原來陣列中的值遷移到新的陣列中。

由於是雙倍擴容,遷移過程中,會將原來 table[i] 中的連結串列的所有節點,分拆到新的陣列的 newTable[i] 和 newTable[i + oldLength] 位置上。如原來陣列長度是 16,那麼擴容後,原來 table[0] 處的連結串列中的所有元素會被分配到新陣列中 newTable[0] 和 newTable[16] 這兩個位置。程式碼比較簡單,這裡就不展開了。


這篇文章沒有講清楚1.7和1.8hashmap擴容的區別,這裡特地補充:


補充:HashMap擴容機制: JDK1.7與1.8的區別

擴容(resize)就是重新計算容量,向HashMap物件裡不停的新增元素,而HashMap物件內部的陣列無法裝載更多的元素時,物件就需要擴大陣列的長度,以便能裝入更多的元素。當然Java裡的陣列是無法自動擴容的,方法是使用一個新的陣列代替已有的容量小的陣列,就像我們用一個小桶裝水,如果想裝更多的水,就得換大水桶。

我們分析下resize的原始碼,鑑於JDK1.8融入了紅黑樹,較複雜,為了便於理解我們仍然使用JDK1.7的程式碼,好理解一些,本質上區別不大,具體區別後文再說。

void resize(int newCapacity) { // 傳入新的容量

    Entry[] oldTable = table; // 引用擴容前的Entry陣列

    int oldCapacity = oldTable.length;

    if (oldCapacity == MAXIMUM_CAPACITY) { // 擴容前的陣列大小如果已經達到最大(2^30)了

        threshold = Integer.MAX_VALUE; // 修改閾值為int的最大值(2^31-1),這樣以後就不會擴容了

        return;
    }

    Entry[] newTable = new Entry[newCapacity]; // 初始化一個新的Entry陣列

    transfer(newTable); // !!將資料轉移到新的Entry數組裡

    table = newTable; // HashMap的table屬性引用新的Entry陣列

    threshold = (int) (newCapacity * loadFactor);// 修改閾值
}

這裡就是使用一個容量更大的陣列來代替已有的容量小的陣列,transfer()方法將原有Entry陣列的元素拷貝到新的Entry數組裡。

void transfer(Entry[] newTable) {

    Entry[] src = table; // src引用了舊的Entry陣列

    int newCapacity = newTable.length;

    for (int j = 0; j < src.length; j++) { // 遍歷舊的Entry陣列

        Entry<K, V> e = src[j]; // 取得舊Entry陣列的每個元素

        if (e != null) {

            src[j] = null;// 釋放舊Entry陣列的物件引用(for迴圈後,舊的Entry陣列不再引用任何物件)

            do {

                Entry<K, V> next = e.next;

                int i = indexFor(e.hash, newCapacity); // !!重新計算每個元素在陣列中的位置
                e.next = newTable[i]; // 標記[1]

                newTable[i] = e; // 將元素放在陣列上

                e = next; // 訪問下一個Entry鏈上的元素

            } while (e != null);
        }
    }
}

newTable[i]的引用賦給了e.next,也就是使用了單鏈表的頭插入方式,同一位置上新元素總會被放在連結串列的頭部位置;這樣先放在一個索引上的元素終會被放到Entry鏈的尾部(如果發生了hash衝突的話),這一點和Jdk1.8有區別,下文詳解。在舊陣列中同一條Entry鏈上的元素,通過重新計算索引位置後,有可能被放到了新陣列的不同位置上。

下面舉個例子說明下擴容過程。假設了我們的hash演算法就是簡單的用key mod 一下表的大小(也就是陣列的長度)。其中的雜湊桶陣列table的size=2, 所以key = 3、7、5,put順序依次為 5、7、3。在mod 2以後都衝突在table[1]這裡了。這裡假設負載因子 loadFactor=1,即當鍵值對的實際大小size 大於 table的實際大小時進行擴容。接下來的三個步驟是雜湊桶陣列 resize成4,然後所有的Node重新rehash的過程。

下面我們講解下JDK1.8做了哪些優化。經過觀測可以發現,我們使用的是2次冪的擴充套件(指長度擴為原來2倍),所以,元素的位置要麼是在原位置,要麼是在原位置再移動2次冪的位置。看下圖可以明白這句話的意思,n為table的長度,圖(a)表示擴容前的key1和key2兩種key確定索引位置的示例,圖(b)表示擴容後key1和key2兩種key確定索引位置的示例,其中hash1是key1對應的雜湊與高位運算結果。

元素在重新計算hash之後,因為n變為2倍,那麼n-1的mask範圍在高位多1bit(紅色),因此新的index就會發生這樣的變化:

因此,我們在擴充HashMap的時候,不需要像JDK1.7的實現那樣重新計算hash,只需要看看原來的hash值新增的那個bit是1還是0就好了,是0的話索引沒變,是1的話索引變成“原索引+oldCap”,可以看看下圖為16擴充為32的resize示意圖:

這個設計確實非常的巧妙,既省去了重新計算hash值的時間,而且同時,由於新增的1bit是0還是1可以認為是隨機的,因此resize的過程,均勻的把之前的衝突的節點分散到新的bucket了。這一塊就是JDK1.8新增的優化點。有一點注意區別,JDK1.7中rehash的時候,舊連結串列遷移新連結串列的時候,如果在新表的陣列索引位置相同,則連結串列元素會倒置,但是從上圖可以看出,JDK1.8不會倒置。有興趣的同學可以研究下JDK1.8的resize原始碼,寫的很贊,如下:

final Node<K,V>[] resize() {

        Node<K,V>[] oldTab = table;

        int oldCap = (oldTab == null) ? 0 : oldTab.length;

        int oldThr = threshold;

        int newCap, newThr = 0;

        if (oldCap > 0) {

            // 超過最大值就不再擴充了,就只好隨你碰撞去吧
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }

            // 沒超過最大值,就擴充為原來的2倍
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
            }

            else if (oldThr > 0) // initial capacity was placed in threshold
                newCap = oldThr;
            else {               // zero initial threshold signifies using defaults
                newCap = DEFAULT_INITIAL_CAPACITY;
                newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
            }

            // 計算新的resize上限
            if (newThr == 0) {
                float ft = (float)newCap * loadFactor;
                newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE);
            }
            threshold = newThr;
		    
            @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
            table = newTab;
            if (oldTab != null) {
                // 把每個bucket都移動到新的buckets中
                for (int j = 0; j < oldCap; ++j) {
                    Node<K,V> e;
                    if ((e = oldTab[j]) != null) {
                        oldTab[j] = null;
                        if (e.next == null)
                            newTab[e.hash & (newCap - 1)] = e;
                        else if (e instanceof TreeNode)
                            ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                        else { // 連結串列優化重hash的程式碼塊
                            Node<K,V> loHead = null, loTail = null;
                            Node<K,V> hiHead = null, hiTail = null;
                            Node<K,V> next;

                            do {
                                next = e.next;
                                // 原索引
                                if ((e.hash & oldCap) == 0) {
                                    if (loTail == null)
                                        loHead = e;
                                    else
                                        loTail.next = e;
                                    loTail = e;
                                }

                                // 原索引+oldCap
                                else {
                                    if (hiTail == null)
                                        hiHead = e;
                                    else
                                        hiTail.next = e;
                                    hiTail = e;
                                }
                            } while ((e = next) != null);

                            // 原索引放到bucket裡
                            if (loTail != null) {
                                loTail.next = null;
                                newTab[j] = loHead;
                            }
                            // 原索引+oldCap放到bucket裡
                            if (hiTail != null) {
                                hiTail.next = null;
                                newTab[j + oldCap] = hiHead;
                            }
                        }
                    }
                }
         }
         return newTab;
}
  • HashMap執行緒安全經典問題:併發修改時的死鎖

另外這裡要引入一個經典的問題,多執行緒put操作時可能導致的遍歷死鎖問題。

在多執行緒使用場景中,應該儘量避免使用執行緒不安全的HashMap,而使用執行緒安全的ConcurrentHashMap。那麼為什麼說HashMap是執行緒不安全的,下面舉例子說明在併發的多執行緒使用場景中使用HashMap可能造成死迴圈。程式碼例子如下(便於理解,仍然使用JDK1.7的環境):

public class HashMapInfiniteLoop { 
    private static HashMap<Integer,String> map = new HashMap<Integer,String>(2,0.75f); 

    public static void main(String[] args) { 

        map.put(5, "C"); 

        new Thread("Thread1") { 
            public void run() { 
                map.put(7, "B"); 
                System.out.println(map); 
            }; 
        }.start(); 
        
        new Thread("Thread2") { 
            public void run() { 
                map.put(3, "A); 
                System.out.println(map); 
            }; 
        }.start();       
    } 
}

其中,map初始化為一個長度為2的陣列,loadFactor=0.75,threshold=2*0.75=1,也就是說當put第二個key的時候,map就需要進行resize。

通過設定斷點讓執行緒1和執行緒2同時debug到transfer方法(3.3小節程式碼塊)的首行。注意此時兩個執行緒已經成功新增資料。放開thread1的斷點至transfer方法的“Entry next = e.next;” 這一行;然後放開執行緒2的的斷點,讓執行緒2進行resize。結果如下圖。

注意,Thread1的 e 指向了key(3),而next指向了key(7),其線上程二rehash後,指向了執行緒二重組後的連結串列。

執行緒一被排程回來執行,先是執行 newTalbe[i] = e, 然後是e = next,導致了e指向了key(7),而下一次迴圈的next = e.next導致了next指向了key(3)。

e.next = newTable[i] 導致 key(3).next 指向了 key(7)。注意:此時的key(7).next 已經指向了key(3), 環形連結串列就這樣出現了。

於是,當我們用執行緒一呼叫map.get(11)時,悲劇就出現了——Infinite Loop。

1.2、get過程分析

相對於 put 過程,get 過程是非常簡單的。

  1. 根據 key 計算 hash 值。
  2. 找到相應的陣列下標:hash & (length - 1)。
  3. 遍歷該陣列位置處的連結串列,直到找到相等(==或equals)的 key。
public V get(Object key) {
    // 之前說過,key 為 null 的話,會被放到 table[0],所以只要遍歷下 table[0] 處的連結串列就可以了
    if (key == null)
        return getForNullKey();
    // 
    Entry<K,V> entry = getEntry(key);

    return null == entry ? null : entry.getValue();
}
  • getEntry(key):

final Entry<K,V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }

    int hash = (key == null) ? 0 : hash(key);
    // 確定陣列下標,然後從頭開始遍歷連結串列,直到找到為止
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

2、Java7 ConcurrentHashMap

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支援併發操作,所以要複雜一些。

整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了“”來代表一個 segment。

簡單理解就是,ConcurrentHashMap 是一個 Segment 陣列,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是執行緒安全的,也就實現了全域性的執行緒安全。

concurrencyLevel:並行級別、併發數、Segment 數,怎麼翻譯不重要,理解它。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支援 16 個執行緒併發寫,只要它們的操作分別分佈在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。

再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證執行緒安全,所以處理起來要麻煩些。

  • 初始化

initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。

loadFactor:負載因子,之前我們說了,Segment 陣列不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4
    // 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是設定整個 map 初始的大小,
    // 這裡根據 initialCapacity 計算 Segment 陣列中每個位置可以分到的大小
    // 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,
    // 插入一個元素不至於擴容,插入第二個的時候才會擴容
    int cap = MIN_SEGMENT_TABLE_CAPACITY; 
    while (cap < c)
        cap <<= 1;

    // 建立 Segment 陣列,
    // 並建立陣列的第一個元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往陣列寫入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

初始化完成,我們得到了一個 Segment 陣列。

我們就當是用 new ConcurrentHashMap() 無參建構函式進行初始化的,那麼初始化完成後:

  • Segment 陣列長度為 16,不可以擴容
  • Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的程式碼會介紹
  • 當前 segmentShift 的值為 32 - 4 = 28,segmentMask 為 16 - 1 = 15,姑且把它們簡單翻譯為移位數掩碼,這兩個值馬上就會用到

2.1、put 過程分析

我們先看 put 的主流程,對於其中的一些關鍵細節操作,後面會進行詳細介紹。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 計算 key 的 hash 值
    int hash = hash(key);
    // 2. 根據 hash 值找到 Segment 陣列中的位置 j
    //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位,
    //    然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的高 4 位,也就是槽的陣列下標
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
    // ensureSegment(j) 對 segment[j] 進行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。

Segment 內部是由 陣列+連結串列 組成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨佔鎖
    //    先看主流程,後面還會具體介紹這部分內容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 這個是 segment 內部的陣列
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求應該放置的陣列下標
        int index = (tab.length - 1) & hash;
        // first 是陣列該位置處的連結串列的表頭
        HashEntry<K,V> first = entryAt(tab, index);

        // 下面這串 for 迴圈雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個連結串列這兩種情況
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續順著連結串列走
                e = e.next;
            }
            else {
                // node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。
                // 如果不為 null,那就直接將它設定為連結串列表頭;如果是null,初始化並設定為連結串列表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);

                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容後面也會具體分析
                else
                    // 沒有達到閾值,將 node 放到陣列 tab 的 index 位置,
                    // 其實就是將新的節點設定成原連結串列的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}

整體流程還是比較簡單的,由於有獨佔鎖的保護,所以 segment 內部的操作並不複雜。至於這裡面的併發問題,我們稍後再進行介紹。

到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。

  • 初始化槽: ensureSegment

ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。

這裡需要考慮併發,因為很可能會有多個執行緒同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 這裡看到為什麼之前要初始化 segment[0] 了,
        // 使用當前 segment[0] 處的陣列長度和負載因子來初始化 segment[k]
        // 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了
        Segment<K,V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);

        // 初始化 segment[k] 內部的陣列
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // 再次檢查一遍該槽是否被其他執行緒初始化了。

            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 迴圈,內部用 CAS,當前執行緒成功設值或其他執行緒成功設值後,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

總的來說,ensureSegment(int k) 比較簡單,對於併發操作使用 CAS 進行控制。

我沒搞懂這裡為什麼要搞一個 while 迴圈,CAS 失敗不就代表有其他執行緒成功了嗎,為什麼要再進行判斷?

----- 如果當前執行緒 CAS 失敗,這裡的 while 迴圈是為了將 seg 賦值返回。

  • 獲取寫入鎖: scanAndLockForPut

前面我們看到,在往某個 segment 中 put 的時候,首先會呼叫 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,如果失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。

下面我們來具體分析這個方法中是怎麼控制加鎖的。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node

    // 迴圈獲取鎖
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 進到這裡說明陣列該位置的連結串列是空的,沒有任何元素
                    // 當然,進到這裡的另一個原因是 tryLock() 失敗,所以該槽存在併發,不一定是該位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 順著連結串列往下走
                e = e.next;
        }
        // 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞佇列等待鎖
        //    lock() 是阻塞方法,直到獲取鎖後返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 這個時候是有大問題了,那就是有新的元素進到了連結串列,成為了新的表頭
                 //     所以這邊的策略是,相當於重新走一遍這個 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

這個方法有兩個出口,一個是 tryLock() 成功了,迴圈終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨佔鎖。

這個方法就是看似複雜,但是其實就是做了一件事,那就是獲取該 segment 的獨佔鎖,如果需要的話順便例項化了一下 node。

  • 擴容: rehash

重複一下,segment 陣列不能擴容,擴容是 segment 陣列某個位置內部的陣列 HashEntry\[] 進行擴容,擴容後,容量為原來的 2 倍。

首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那麼先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。

該方法不需要考慮併發,因為到這裡的時候,是持有該 segment 的獨佔鎖的。

// 方法引數上的 node 是這次擴容後,需要新增到新的陣列中的資料。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 建立新陣列
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進位制 ‘000...00011111’
    int sizeMask = newCapacity - 1;

    // 遍歷原陣列,老套路,將原陣列位置 i 處的連結串列拆分到 新陣列位置 i 和 i+oldCap 兩個位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是連結串列的第一個元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算應該放置在新陣列中的位置,
            // 假設原陣列長度為 16,e 在 oldTable[3] 處,那麼 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            if (next == null)   // 該位置處只有一個元素,那比較好辦
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是連結串列表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當前連結串列的頭結點 e 的新位置
                int lastIdx = idx;

                // 下面這個 for 迴圈會找到一個 lastRun 節點,這個節點之後的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之後的所有節點組成的這個連結串列放到 lastIdx 這個位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的節點,
                //    這些節點可能分配在另一個連結串列中,也可能分配到上面的那個連結串列中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新陣列中剛剛的 兩個連結串列之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

這裡的擴容比之前的 HashMap 要複雜一些,程式碼難懂一點。上面有兩個挨著的 for 迴圈,第一個 for 有什麼用呢?

仔細一看發現,如果沒有第一個 for 迴圈,也是可以工作的,但是,這個 for 迴圈下來,如果 lastRun 的後面還有比較多的節點,那麼這次就是值得的。因為我們只需要克隆 lastRun 前面的節點,後面的一串節點跟著 lastRun 走就是了,不需要做任何操作。

我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是連結串列的最後一個元素或者很靠後的元素,那麼這次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,如果使用預設的閾值,大約只有 1/6 的節點需要克隆

2.2、get 過程分析

相對於 put 來說,get 真的不要太簡單。

  1. 計算 hash 值,找到 segment 陣列中的具體位置,或我們前面用的“槽”
  2. 槽中也是一個數組,根據 hash 找到陣列中具體的位置
  3. 到這裡是連結串列了,順著連結串列進行查詢即可
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根據 hash 找到對應的 segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 內部陣列相應位置的連結串列,遍歷
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

2.3、size()操作

put、remove和get操作只需要關心一個Segment,而size操作需要遍歷所有的Segment才能算出整個Map的大小。一個簡單的方案是,先鎖住所有Sgment,計算完後再解鎖。但這樣做,在做size操作時,不僅無法對Map進行寫操作,同時也無法進行讀操作,不利於對Map的並行操作。

為更好支援併發操作,ConcurrentHashMap會在不上鎖的前提逐個Segment計算3次size,如果某相鄰兩次計算獲取的所有Segment的更新次數(每個Segment都與HashMap一樣通過modCount跟蹤自己的修改次數,Segment每修改一次其modCount加一)相等,說明這兩次計算過程中無更新操作,則這兩次計算出的總size相等,可直接作為最終結果返回。如果這三次計算過程中Map有更新,則對所有Segment加鎖重新計算Size。該計算方法程式碼如下

public int size() {
    final Segment<K, V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum; // sum of modCounts
    long last = 0L; // previous sum
    int retries = -1; // first iteration isn't retry

    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;

                for (int j = 0; j < segments.length; ++j) {
                    Segment<K, V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}
  •  不同之處

JDK1.7中ConcurrentHashMap與HashMap相比,有以下不同點

  • ConcurrentHashMap執行緒安全,而HashMap非執行緒安全
  • HashMap允許Key和Value為null,而ConcurrentHashMap不允許
  • HashMap不允許通過Iterator遍歷的同時通過HashMap修改,而ConcurrentHashMap允許該行為,並且該更新對後續的遍歷可見
  • 併發問題分析

現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮併發問題。

新增節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨佔鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。

1、put 操作的執行緒安全性。

  • 初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的陣列。
  • 新增節點到連結串列的操作是插入到表頭的,所以,如果這個時候 get 操作在連結串列遍歷的過程已經到了中間,是不會影響的。當然,另一個併發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
  • 擴容。擴容是新建立了陣列,然後進行遷移資料,最後面將 newTable 設定給屬性 table。所以,如果 get 操作此時也在進行,那麼也沒關係,如果 get 先行,那麼就是在舊的 table 上做查詢操作;而 put 先行,那麼 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。

2、remove 操作的執行緒安全性。

  • remove 操作我們沒有分析原始碼,所以這裡說的讀者感興趣的話還是需要到原始碼中去求實一下的。
  • get 操作需要遍歷連結串列,但是 remove 操作會"破壞"連結串列。
  • 如果 remove 破壞的節點 get 操作已經過去了,那麼這裡不存在任何問題。
  • 如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麼需要將頭結點的 next 設定為陣列該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 並不能提供陣列內部操作的可見性保證,所以原始碼中使用了 UNSAFE 來運算元組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裡的併發保證就是 next 屬性是 volatile 的。

3、Java8 HashMap

Java8 對 HashMap 進行了一些修改,最大的不同就是利用了紅黑樹,所以其由 陣列+連結串列+紅黑樹 組成。

根據 Java7 HashMap 的介紹,我們知道,查詢的時候,根據 hash 值我們能夠快速定位到陣列的具體下標,但是之後的話,需要順著連結串列一個個比較下去才能找到我們需要的,時間複雜度取決於連結串列的長度,為 O(n)

為了降低這部分的開銷,在 Java8 中,當連結串列中的元素達到了 8 個時,會將連結串列轉換為紅黑樹,在這些位置進行查詢的時候可以降低時間複雜度為 O(logN)

來一張圖簡單示意一下吧:

注意,上圖是示意圖,主要是描述結構,不會達到這個狀態的,因為這麼多資料的時候早就擴容了。

下面,我們還是用程式碼來介紹吧,個人感覺,Java8 的原始碼可讀性要差一些,不過精簡一些。

Java7 中使用 Entry 來代表每個 HashMap 中的資料節點,Java8 中使用 Node,基本沒有區別,都是 key,value,hash 和 next 這四個屬性,不過,Node 只能用於連結串列的情況,紅黑樹的情況需要使用 TreeNode

我們根據陣列元素中,第一個節點資料型別是 Node 還是 TreeNode 來判斷該位置下是連結串列還是紅黑樹的。

3.1、put 過程分析

public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}

// 第三個引數 onlyIfAbsent 如果是 true,那麼只有在不存在該 key 時才會進行 put 操作
// 第四個引數 evict 我們這裡不關心
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    // 第一次 put 值的時候,會觸發下面的 resize(),類似 java7 的第一次 put 也要初始化陣列長度
    // 第一次 resize 和後續的擴容有些不一樣,因為這次是陣列從 null 初始化到預設的 16 或自定義的初始容量
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    // 找到具體的陣列下標,如果此位置沒有值,那麼直接初始化一下 Node 並放置在這個位置就可以了
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);

    else {// 陣列該位置有資料
        Node<K,V> e; K k;
        // 首先,判斷該位置的第一個資料和我們要插入的資料,key 是不是"相等",如果是,取出這個節點
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        // 如果該節點是代表紅黑樹的節點,呼叫紅黑樹的插值方法,本文不展開說紅黑樹
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            // 到這裡,說明陣列該位置上是一個連結串列
            for (int binCount = 0; ; ++binCount) {
                // 插入到連結串列的最後面(Java7 是插入到連結串列的最前面)
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    // TREEIFY_THRESHOLD 為 8,所以,如果新插入的值是連結串列中的第 8 個
                    // 會觸發下面的 treeifyBin,也就是將連結串列轉換為紅黑樹
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                // 如果在該連結串列中找到了"相等"的 key(== 或 equals)
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    // 此時 break,那麼 e 為連結串列中[與要插入的新值的 key "相等"]的 node
                    break;
                p = e;
            }
        }
        // e!=null 說明存在舊值的key與要插入的key"相等"
        // 對於我們分析的put操作,下面這個 if 其實就是進行 "值覆蓋",然後返回舊值
        if (e != null) {
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    // 如果 HashMap 由於新插入這個值導致 size 已經超過了閾值,需要進行擴容
    if (++size > threshold)
        resize();
    afterNodeInsertion(evict);
    return null;
}

和 Java7 稍微有點不一樣的地方就是,Java7 是先擴容後插入新值的,Java8 先插值再擴容,不過這個不重要。

  • 陣列擴容

resize() 方法用於初始化陣列陣列擴容,每次擴容後,容量為原來的 2 倍,並進行資料遷移。

final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table;
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    int oldThr = threshold;
    int newCap, newThr = 0;
    if (oldCap > 0) { // 對應陣列擴容
        if (oldCap >= MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        // 將陣列大小擴大一倍
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                 oldCap >= DEFAULT_INITIAL_CAPACITY)
            // 將閾值擴大一倍
            newThr = oldThr << 1; // double threshold
    }
    else if (oldThr > 0) // 對應使用 new HashMap(int initialCapacity) 初始化後,第一次 put 的時候
        newCap = oldThr;
    else {// 對應使用 new HashMap() 初始化後,第一次 put 的時候
        newCap = DEFAULT_INITIAL_CAPACITY;
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }

    if (newThr == 0) {
        float ft = (float)newCap * loadFactor;
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                  (int)ft : Integer.MAX_VALUE);
    }
    threshold = newThr;

    // 用新的陣列大小初始化新的陣列
    Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab; // 如果是初始化陣列,到這裡就結束了,返回 newTab 即可

    if (oldTab != null) {
        // 開始遍歷原陣列,進行資料遷移。
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) {
                oldTab[j] = null;
                // 如果該陣列位置上只有單個元素,那就簡單了,