1. 程式人生 > >HashMap和ConcurrentHashMap學習

HashMap和ConcurrentHashMap學習

HashMap:

jdk1.7中HashMap資料結構:

1.7中的原始碼說明:

這是 HashMap 中比較核心的幾個成員變數;看看分別是什麼意思?

  1. 初始化桶大小,因為底層是陣列,所以這是陣列預設的大小。
  2. 桶最大值。
  3. 預設的負載因子(0.75)
  4. table 真正存放資料的陣列。
  5. Map 存放數量的大小。
  6. 桶大小,可在初始化時顯式指定。
  7. 負載因子,可在初始化時顯式指定。

重點解釋下負載因子:
由於給定的 HashMap 的容量大小是固定的,比如預設初始化:

public HashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
}
public HashMap(int initialCapacity, float loadFactor) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException("Illegal initial capacity: " +
                                           initialCapacity);
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    if (loadFactor <= 0 || Float.isNaN(loadFactor))
        throw new IllegalArgumentException("Illegal load factor: " +
                                           loadFactor);
    this.loadFactor = loadFactor;
    threshold = initialCapacity;
    init();
}

給定的預設容量為 16,負載因子為 0.75。Map 在使用過程中不斷的往裡面存放資料,當數量達到了 16 * 0.75 = 12 就需要將當前 16 的容量進行擴容,而擴容這個過程涉及到 rehash、複製資料等操作,所以非常消耗效能。因此通常建議能提前預估 HashMap 的大小最好,儘量的減少擴容帶來的效能損耗。根據程式碼可以看到其實真正存放資料的是:
transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;

這個陣列,那麼它又是如何定義的呢?

Entry 是 HashMap 中的一個內部類,從他的成員變數很容易看出:

  • key 就是寫入時的鍵。
  • value 自然就是值。
  • 開始的時候就提到 HashMap 是由陣列和連結串列組成,所以這個 next 就是用於實現連結串列結構。
  • hash 存放的是當前 key 的 hashcode。

put過程:

public V put(K key, V value) {
    // 當插入第一個元素的時候,需要先初始化陣列大小
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    // 如果 key 為 null,感興趣的可以往裡看,最終會將這個 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
    // 1. 求 key 的 hash 值
    int hash = hash(key);
    // 2. 找到對應的陣列下標
    int i = indexFor(hash, table.length);
    // 3. 遍歷一下對應下標處的連結串列,看是否有重複的 key 已經存在,
    //    如果有,直接覆蓋,put 方法返回舊值就結束了
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
 
    modCount++;
    // 4. 不存在重複的 key,將此 entry 新增到連結串列中,細節後面說
    addEntry(hash, key, value, i);
    return null;
}

陣列初始化:在第一個元素插入 HashMap 的時候做一次陣列的初始化,就是先確定初始的陣列大小,並計算陣列擴容的閾值。

static int indexFor(int hash, int length) {
    // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
    return hash & (length-1);
}

計算具體陣列位置:這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在陣列長度為 32 的時候,其實取的就是 key 的 hash 值的低 5 位,作為它在陣列中的下標位置。

void addEntry(int hash, K key, V value, int bucketIndex) {
    // 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的陣列位置已經有元素了,那麼要擴容
    if ((size >= threshold) && (null != table[bucketIndex])) {
        // 擴容,後面會介紹一下
        resize(2 * table.length);
        // 擴容以後,重新計算 hash 值
        hash = (null != key) ? hash(key) : 0;
        // 重新計算擴容後的新的下標
        bucketIndex = indexFor(hash, table.length);
    }
    // 往下看
    createEntry(hash, key, value, bucketIndex);
}
// 這個很簡單,其實就是將新值放到連結串列的表頭,然後 size++
void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

新增節點到連結串列中:找到陣列下標後,會先進行 key 判重,如果沒有重複,就準備將新值放入到連結串列的表頭。這個方法的主要邏輯就是先判斷是否需要擴容,需要的話先擴容,然後再將這個新的資料插入到擴容後的陣列的相應位置處的連結串列的表頭。

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }
    // 新的陣列
    Entry[] newTable = new Entry[newCapacity];
    // 將原來陣列中的值遷移到新的更大的陣列中
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

陣列擴容:在插入新值的時候,如果當前的 size 已經達到了閾值,並且要插入的陣列位置上已經有元素,那麼就會觸發擴容,擴容後,陣列大小為原來的 2 倍。擴容就是用一個新的大陣列替換原來的小陣列,並將原來陣列中的值遷移到新的陣列中。

總結:

  • 判斷當前陣列是否需要初始化。
  • 如果 key 為空,則 put 一個空值進去。
  • 根據 key 計算出 hashcode。
  • 根據計算出的 hashcode 定位出所在桶。
  • 如果桶是一個連結串列則需要遍歷判斷裡面的 hashcode、key 是否和傳入 key 相等,如果相等則進行覆蓋,並返回原來的值。
  • 如果桶是空的,說明當前位置沒有資料存入;新增一個 Entry 物件寫入當前位置。
  • 當呼叫 addEntry 寫入 Entry 時需要判斷是否需要擴容。如果需要就進行兩倍擴充,並將當前的 key 重新 hash 並定位。而在 createEntry 中會將當前位置的桶傳入到新建的桶中,如果當前桶有值就會在位置形成連結串列。

get過程:

public V get(Object key) {
    if (key == null)
        return getForNullKey();
    Entry<K,V> entry = getEntry(key);
    return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }
    int hash = (key == null) ? 0 : hash(key);
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}
  • 首先也是根據 key 計算出 hashcode,然後定位到具體的桶中。
  • 判斷該位置是否為連結串列。
  • 不是連結串列就根據 key、key 的 hashcode 是否相等來返回值。
  • 為連結串列則需要遍歷直到 key 及 hashcode 相等時候就返回值。
  • 啥都沒取到就直接返回 null 。

jdk1.8中HashMap資料結構:

Java8 對 HashMap 進行了一些修改,最大的不同就是利用了紅黑樹,所以其由 陣列+連結串列+紅黑樹 組成。根據 Java7 HashMap 的介紹,我們知道,查詢的時候,根據 hash 值我們能夠快速定位到陣列的具體下標,但是之後的話,需要順著連結串列一個個比較下去才能找到我們需要的,時間複雜度取決於連結串列的長度,為 O(n)。為了降低這部分的開銷,在 Java8 中,當連結串列中的元素超過了 8 個以後,會將連結串列轉換為紅黑樹,在這些位置進行查詢的時候可以降低時間複雜度為 O(logN)。

Java7 中使用 Entry 來代表每個 HashMap 中的資料節點,Java8 中使用 Node,基本沒有區別,都是 key,value,hash 和 next 這四個屬性,不過,Node 只能用於連結串列的情況,紅黑樹的情況需要使用 TreeNode。我們根據陣列元素中,第一個節點資料型別是 Node 還是 TreeNode 來判斷該位置下是連結串列還是紅黑樹的。

核心的成員變數:

  • TREEIFY_THRESHOLD 用於判斷是否需要將連結串列轉換為紅黑樹的閾值。
  • HashEntry 修改為 Node
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
/**
 * The maximum capacity, used if a higher value is implicitly specified
 * by either of the constructors with arguments.
 * MUST be a power of two <= 1<<30.
 */
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
 * The load factor used when none specified in constructor.
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
transient Node<K,V>[] table;
/**
 * Holds cached entrySet(). Note that AbstractMap fields are used
 * for keySet() and values().
 */
transient Set<Map.Entry<K,V>> entrySet;
/**
 * The number of key-value mappings contained in this map.
 */
transient int size;

put過程:

public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}
 
// 第三個引數 onlyIfAbsent 如果是 true,那麼只有在不存在該 key 時才會進行 put 操作
// 第四個引數 evict 我們這裡不關心
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    // 第一次 put 值的時候,會觸發下面的 resize(),類似 java7 的第一次 put 也要初始化陣列長度
    // 第一次 resize 和後續的擴容有些不一樣,因為這次是陣列從 null 初始化到預設的 16 或自定義的初始容量
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    // 找到具體的陣列下標,如果此位置沒有值,那麼直接初始化一下 Node 並放置在這個位置就可以了
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
 
    else {// 陣列該位置有資料
        Node<K,V> e; K k;
        // 首先,判斷該位置的第一個資料和我們要插入的資料,key 是不是"相等",如果是,取出這個節點
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        // 如果該節點是代表紅黑樹的節點,呼叫紅黑樹的插值方法,本文不展開說紅黑樹
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            // 到這裡,說明陣列該位置上是一個連結串列
            for (int binCount = 0; ; ++binCount) {
                // 插入到連結串列的最後面(Java7 是插入到連結串列的最前面)
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    // TREEIFY_THRESHOLD 為 8,所以,如果新插入的值是連結串列中的第 9 個
                    // 會觸發下面的 treeifyBin,也就是將連結串列轉換為紅黑樹
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                // 如果在該連結串列中找到了"相等"的 key(== 或 equals)
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    // 此時 break,那麼 e 為連結串列中[與要插入的新值的 key "相等"]的 node
                    break;
                p = e;
            }
        }
        // e!=null 說明存在舊值的key與要插入的key"相等"
        // 對於我們分析的put操作,下面這個 if 其實就是進行 "值覆蓋",然後返回舊值
        if (e != null) {
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    // 如果 HashMap 由於新插入這個值導致 size 已經超過了閾值,需要進行擴容
    if (++size > threshold)
        resize();
    afterNodeInsertion(evict);
    return null;
}

Java7 稍微有點不一樣的地方就是,Java7 是先擴容後插入新值的,Java8 先插值再擴容

final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table;
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    int oldThr = threshold;
    int newCap, newThr = 0;
    if (oldCap > 0) { // 對應陣列擴容
        if (oldCap >= MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        // 將陣列大小擴大一倍
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                 oldCap >= DEFAULT_INITIAL_CAPACITY)
            // 將閾值擴大一倍
            newThr = oldThr << 1; // double threshold
    }
    else if (oldThr > 0) // 對應使用 new HashMap(int initialCapacity) 初始化後,第一次 put 的時候
        newCap = oldThr;
    else {// 對應使用 new HashMap() 初始化後,第一次 put 的時候
        newCap = DEFAULT_INITIAL_CAPACITY;
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }
 
    if (newThr == 0) {
        float ft = (float)newCap * loadFactor;
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                  (int)ft : Integer.MAX_VALUE);
    }
    threshold = newThr;
 
    // 用新的陣列大小初始化新的陣列
    Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab; // 如果是初始化陣列,到這裡就結束了,返回 newTab 即可
 
    if (oldTab != null) {
        // 開始遍歷原陣列,進行資料遷移。
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) {
                oldTab[j] = null;
                // 如果該陣列位置上只有單個元素,那就簡單了,簡單遷移這個元素就可以了
                if (e.next == null)
                    newTab[e.hash & (newCap - 1)] = e;
                // 如果是紅黑樹,具體我們就不展開了
                else if (e instanceof TreeNode)
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { 
                    // 這塊是處理連結串列的情況,
                    // 需要將此連結串列拆成兩個連結串列,放到新的陣列中,並且保留原來的先後順序
                    // loHead、loTail 對應一條連結串列,hiHead、hiTail 對應另一條連結串列,程式碼還是比較簡單的
                    Node<K,V> loHead = null, loTail = null;
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    if (loTail != null) {
                        loTail.next = null;
                        // 第一條連結串列
                        newTab[j] = loHead;
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        // 第二條連結串列的新的位置是 j + oldCap,這個很好理解
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    return newTab;
}

陣列擴容:resize() 方法用於初始化陣列或陣列擴容,每次擴容後,容量為原來的 2 倍,並進行資料遷移。

總結:

  1. 判斷當前桶是否為空,空的就需要初始化(resize 中會判斷是否進行初始化)。
  2. 根據當前 key 的 hashcode 定位到具體的桶中並判斷是否為空,為空表明沒有 Hash 衝突就直接在當前位置建立一個新桶即可。
  3. 如果當前桶有值( Hash 衝突),那麼就要比較當前桶中的 key、key 的 hashcode 與寫入的 key 是否相等,相等就賦值給 e,在第 8 步的時候會統一進行賦值及返回。
  4. 如果當前桶為紅黑樹,那就要按照紅黑樹的方式寫入資料。
  5. 如果是個連結串列,就需要將當前的 key、value 封裝成一個新節點寫入到當前桶的後面(形成連結串列)。
  6. 接著判斷當前連結串列的大小是否大於預設的閾值,大於時就要轉換為紅黑樹。
  7. 如果在遍歷過程中找到 key 相同時直接退出遍歷。
  8. 如果 e != null 就相當於存在相同的 key,那就需要將值覆蓋。
  9. 最後判斷是否需要進行擴容。

get過程:

public V get(Object key) {
    Node<K,V> e;
    return (e = getNode(hash(key), key)) == null ? null : e.value;
}

final Node<K,V> getNode(int hash, Object key) {
    Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (first = tab[(n - 1) & hash]) != null) {
        // 判斷第一個節點是不是就是需要的
        if (first.hash == hash && // always check first node
            ((k = first.key) == key || (key != null && key.equals(k))))
            return first;
        if ((e = first.next) != null) {
            // 判斷是否是紅黑樹
            if (first instanceof TreeNode)
                return ((TreeNode<K,V>)first).getTreeNode(hash, key);
 
            // 連結串列遍歷
            do {
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    return e;
            } while ((e = e.next) != null);
        }
    }
    return null;
}
  • 首先將 key hash 之後取得所定位的桶。
  • 如果桶為空則直接返回 null 。
  • 否則判斷桶的第一個位置(有可能是連結串列、紅黑樹)的 key 是否為查詢的 key,是就直接返回 value。
  • 如果第一個不匹配,則判斷它的下一個是紅黑樹還是連結串列。
  • 紅黑樹就按照樹的查詢方式返回值。
  • 不然就按照連結串列的方式遍歷匹配返回值。

1.7和1.8HashMap的差異:

從這兩個核心方法(get/put)可以看出 1.8 中對大連結串列做了優化,修改為紅黑樹之後查詢效率直接提高到了 O(logn)。但是 HashMap 原有的問題也都存在,比如在併發場景下使用時容易出現死迴圈。

final HashMap<String, String> map = new HashMap<String, String>();
for (int i = 0; i < 1000; i++) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            map.put(UUID.randomUUID().toString(), "");
        }
    }).start();
}

看過上文的還記得在 HashMap 擴容的時候會呼叫 resize() 方法,就是這裡的併發操作容易在一個桶上形成環形連結串列;這樣當獲取一個不存在的 key 時,計算出的 index 正好是環形連結串列的下標就會出現死迴圈。

如圖:

遍歷方式:

Iterator<Map.Entry<String, Integer>> entryIterator = map.entrySet().iterator();
        while (entryIterator.hasNext()) {
            Map.Entry<String, Integer> next = entryIterator.next();
            System.out.println("key=" + next.getKey() + " value=" + next.getValue());
        }
        
Iterator<String> iterator = map.keySet().iterator();
        while (iterator.hasNext()){
            String key = iterator.next();
            System.out.println("key=" + key + " value=" + map.get(key));
        }

強烈建議使用第一種 EntrySet 進行遍歷。第一種可以把 key value 同時取出,第二種還得需要通過 key 取一次 value,效率較低。簡單總結下 HashMap:無論是 1.7 還是 1.8 其實都能看出 JDK 沒有對它做任何的同步操作,所以併發會出問題,甚至 1.7 中出現死迴圈導致系統不可用(1.8 已經修復死迴圈問題)。因此 JDK 推出了專項專用的 ConcurrentHashMap ,該類位於 java.util.concurrent 包下,專門用於解決併發問題。

堅持看到這裡的朋友算是已經把 ConcurrentHashMap 的基礎已經打牢了,下面正式開始分析。

ConcurrentHashMap

1.7中ConcurrentHashMap的資料結構:

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支援併發操作,所以要複雜一些。整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了“槽”來代表一個 segment。簡單理解就是,ConcurrentHashMap 是一個 Segment 陣列,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是執行緒安全的,也就實現了全域性的執行緒安全。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4
    // 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
 
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
 
    // initialCapacity 是設定整個 map 初始的大小,
    // 這裡根據 initialCapacity 計算 Segment 陣列中每個位置可以分到的大小
    // 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,
    // 插入一個元素不至於擴容,插入第二個的時候才會擴容
    int cap = MIN_SEGMENT_TABLE_CAPACITY; 
    while (cap < c)
        cap <<= 1;
 
    // 建立 Segment 陣列,
    // 並建立陣列的第一個元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往陣列寫入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

初始化:初始化完成,我們得到了一個 Segment 陣列。

  • concurrencyLevel:並行級別、併發數、Segment 數,怎麼翻譯不重要,理解它。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支援 16 個執行緒併發寫,只要它們的操作分別分佈在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證執行緒安全,所以處理起來要麻煩些。
  • initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。
  • loadFactor:負載因子,之前我們說了,Segment 陣列不可以擴容,所以這個負載因子是給每個 Segment 內部使用的

我們就當是用 new ConcurrentHashMap() 無參建構函式進行初始化的,那麼初始化完成後:

  • Segment 陣列長度為 16,不可以擴容
  • Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的程式碼會介紹
  • 當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

簡單來說可以這樣理解:

如圖所示,是由 Segment 陣列、HashEntry 組成,和 HashMap 一樣,仍然是陣列加連結串列。

核心成員變數
/**
 * Segment 陣列,存放資料時首先需要定位到具體的 Segment 中。
 */
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;

Segment 是 ConcurrentHashMap 的一個內部類,主要的組成如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
       private static final long serialVersionUID = 2249069246763182397L;
       
       // 和 HashMap 中的 HashEntry 作用一樣,真正存放資料的桶
       transient volatile HashEntry<K,V>[] table;
       transient int count;
       transient int modCount;
       transient int threshold;
       final float loadFactor;
}

其中 HashEntry 的組成:

和 HashMap 非常類似,唯一的區別就是其中的核心資料如 value ,以及連結串列都是 volatile 修飾的,保證了獲取時的可見性。原理上來說:ConcurrentHashMap 採用了分段鎖技術,其中 Segment 繼承於 ReentrantLock。不會像 HashTable 那樣不管是 put 還是 get 操作都需要做同步處理,理論上 ConcurrentHashMap 支援 CurrencyLevel (Segment 陣列數量)的執行緒併發。每當一個執行緒佔用鎖訪問一個 Segment 時,不會影響到其他的 Segment。

put的過程

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 計算 key 的 hash 值
    int hash = hash(key);
    // 2. 根據 hash 值找到 Segment 陣列中的位置 j
    //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,
    //    然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的陣列下標
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
    // ensureSegment(j) 對 segment[j] 進行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨佔鎖
    //    先看主流程,後面還會具體介紹這部分內容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 這個是 segment 內部的陣列
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求應該放置的陣列下標
        int index = (tab.length - 1) & hash;
        // first 是陣列該位置處的連結串列的表頭
        HashEntry<K,V> first = entryAt(tab, index);
 
        // 下面這串 for 迴圈雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個連結串列這兩種情況
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續順著連結串列走
                e = e.next;
            }
            else {
                // node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。
                // 如果不為 null,那就直接將它設定為連結串列表頭;如果是null,初始化並設定為連結串列表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容後面也會具體分析
                else
                    // 沒有達到閾值,將 node 放到陣列 tab 的 index 位置,
                    // 其實就是將新的節點設定成原連結串列的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}

Segment 內部是由 陣列+連結串列 組成的。整體流程還是比較簡單的,由於有獨佔鎖的保護,所以 segment 內部的操作並不複雜。至於這裡面的併發問題,我們稍後再進行介紹。

到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 這裡看到為什麼之前要初始化 segment[0] 了,
        // 使用當前 segment[0] 處的陣列長度和負載因子來初始化 segment[k]
        // 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了
        Segment<K,V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
 
        // 初始化 segment[k] 內部的陣列
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // 再次檢查一遍該槽是否被其他執行緒初始化了。
 
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 迴圈,內部用 CAS,當前執行緒成功設值或其他執行緒成功設值後,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

初始化槽: ensureSegment:ConcurrentHashMap初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。這裡需要考慮併發,因為很可能會有多個執行緒同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。總的來說,ensureSegment(int k) 比較簡單,對於併發操作使用 CAS 進行控制。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
 
    // 迴圈獲取鎖
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 進到這裡說明陣列該位置的連結串列是空的,沒有任何元素
                    // 當然,進到這裡的另一個原因是 tryLock() 失敗,所以該槽存在併發,不一定是該位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 順著連結串列往下走
                e = e.next;
        }
        // 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞佇列等待鎖
        //    lock() 是阻塞方法,直到獲取鎖後返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 這個時候是有大問題了,那就是有新的元素進到了連結串列,成為了新的表頭
                 //     所以這邊的策略是,相當於重新走一遍這個 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

獲取寫入鎖: scanAndLockForPut:前面我們看到,在往某個 segment 中 put 的時候,首先會呼叫 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,如果失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。這個方法有兩個出口,一個是 tryLock() 成功了,迴圈終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨佔鎖。這個方法就是看似複雜,但是其實就是做了一件事,那就是獲取該 segment 的獨佔鎖,如果需要的話順便例項化了一下 node。

// 方法引數上的 node 是這次擴容後,需要新增到新的陣列中的資料。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 建立新陣列
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進位制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
 
    // 遍歷原陣列,老套路,將原陣列位置 i 處的連結串列拆分到 新陣列位置 i 和 i+oldCap 兩個位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是連結串列的第一個元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算應該放置在新陣列中的位置,
            // 假設原陣列長度為 16,e 在 oldTable[3] 處,那麼 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            if (next == null)   // 該位置處只有一個元素,那比較好辦
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是連結串列表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當前連結串列的頭結點 e 的新位置
                int lastIdx = idx;
 
                // 下面這個 for 迴圈會找到一個 lastRun 節點,這個節點之後的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之後的所有節點組成的這個連結串列放到 lastIdx 這個位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的節點,
                //    這些節點可能分配在另一個連結串列中,也可能分配到上面的那個連結串列中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新陣列中剛剛的 兩個連結串列之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

擴容: rehash,重複一下,segment 陣列不能擴容,擴容是 segment 陣列某個位置內部的陣列 HashEntry[] 進行擴容,擴容後,容量為原來的 2 倍。首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那麼先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。該方法不需要考慮併發,因為到這裡的時候,是持有該 segment 的獨佔鎖的。

這裡的擴容比之前的 HashMap 要複雜一些,程式碼難懂一點。上面有兩個挨著的 for 迴圈,第一個 for 有什麼用呢?仔細一看發現,如果沒有第一個 for 迴圈,也是可以工作的,但是,這個 for 迴圈下來,如果 lastRun 的後面還有比較多的節點,那麼這次就是值得的。因為我們只需要克隆 lastRun 前面的節點,後面的一串節點跟著 lastRun 走就是了,不需要做任何操作。我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是連結串列的最後一個元素或者很靠後的元素,那麼這次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,如果使用預設的閾值,大約只有 1/6 的節點需要克隆。

get過程:

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根據 hash 找到對應的 segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 內部陣列相應位置的連結串列,遍歷
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
  1. 計算 hash 值,找到 segment 陣列中的具體位置,或我們前面用的“槽”
  2. 槽中也是一個數組,根據 hash 找到陣列中具體的位置
  3. 到這裡是連結串列了,順著連結串列進行查詢即可

併發問題分析:

現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮併發問題。

新增節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨佔鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。

  1. put 操作的執行緒安全性。
    • 初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的陣列。
    • 新增節點到連結串列的操作是插入到表頭的,所以,如果這個時候 get 操作在連結串列遍歷的過程已經到了中間,是不會影響的。當然,另一個併發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
    • 擴容。擴容是新建立了陣列,然後進行遷移資料,最後面將 newTable 設定給屬性 table。所以,如果 get 操作此時也在進行,那麼也沒關係,如果 get 先行,那麼就是在舊的 table 上做查詢操作;而 put 先行,那麼 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
  2. remove 操作的執行緒安全性。

    remove 操作我們沒有分析原始碼,所以這裡說的讀者感興趣的話還是需要到原始碼中去求實一下的。get 操作需要遍歷連結串列,但是 remove 操作會”破壞”連結串列。如果 remove 破壞的節點 get 操作已經過去了,那麼這裡不存在任何問題。如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麼需要將頭結點的 next 設定為陣列該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 並不能提供陣列內部操作的可見性保證,所以原始碼中使用了 UNSAFE 來運算元組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裡的併發保證就是 next 屬性是 volatile 的。

JDK1.8中的ConcurrentHashMap資料結構:

Java7 中實現的 ConcurrentHashMap 說實話還是比較複雜的,Java8 對 ConcurrentHashMap 進行了比較大的改動。建議讀者可以參考 Java8 中 HashMap 相對於 Java7 HashMap 的改動,對於 ConcurrentHashMap,Java8 也引入了紅黑樹。其中拋棄了原有的 Segment 分段鎖,而採用了 CAS + synchronized 來保證併發安全性。也將 1.7 中存放資料的 HashEntry 改為 Node,但作用都是相同的。其中的 val next 都用了 volatile 修飾,保證了可見性。

// 這建構函式裡,什麼都不幹
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

通過提供初始容量,計算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然後向上取最近的 2 的 n 次方】。如 initialCapacity 為 10,那麼得到 sizeCtl 為 16,如果 initialCapacity 為 11,得到 sizeCtl 為 32。

put過程:

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 得到 hash 值
    int hash = spread(key.hashCode());
    // 用於記錄相應連結串列的長度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果陣列"空",進行陣列初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化陣列,後面會詳細介紹
            tab = initTable();
 
        // 找該 hash 值對應的陣列下標,得到第一個節點 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果陣列該位置為空,
            //    用一次 CAS 操作將這個新值放入其中即可,這個 put 操作差不多就結束了,可以拉到最後面了
            //          如果 CAS 失敗,那就是有併發操作,進到下一個迴圈就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 居然可以等於 MOVED,這個需要到後面才能看明白,不過從名字上也能猜到,肯定是因為在擴容
        else if ((fh = f.hash) == MOVED)
            // 幫助資料遷移,這個等到看完資料遷移部分的介紹後,再理解這個就很簡單了
            tab = helpTransfer(tab, f);
 
        else { // 到這裡就是說,f 是該位置的頭結點,而且不為空
 
            V oldVal = null;
            // 獲取陣列該位置的頭結點的監視器鎖
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 頭結點的 hash 值大於 0,說明是連結串列
                        // 用於累加,記錄連結串列的長度
                        binCount = 1;
                        // 遍歷連結串列
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果發現了"相等"的 key,判斷是否要進行值覆蓋,然後也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了連結串列的最末端,將這個新值放到連結串列的最後面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 紅黑樹
                        Node<K,V> p;
                        binCount = 2;
                        // 呼叫紅黑樹的插值方法插入新節點
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // binCount != 0 說明上面在做連結串列操作
            if (binCount != 0) {
                // 判斷是否要將連結串列轉換為紅黑樹,臨界值和 HashMap 一樣,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 這個方法和 HashMap 中稍微有一點點不同,那就是它不是一定會進行紅黑樹轉換,
                    // 如果當前陣列的長度小於 64,那麼會選擇進行陣列擴容,而不是轉換為紅黑樹
                    //    具體原始碼我們就不看了,擴容部分後面說
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 
    addCount(1L, binCount);
    return null;
}

總結:

  • 根據 key 計算出 hashcode 。
  • 判斷是否需要進行初始化。
  • f 即為當前 key 定位出的 Node,如果為空表示當前位置可以寫入資料,利用 CAS 嘗試寫入,失敗則自旋保證成功。
  • 如果當前位置的 hashcode == MOVED == -1,則需要進行擴容。
  • 如果都不滿足,則利用 synchronized 鎖寫入資料。
  • 如果數量大於 TREEIFY_THRESHOLD 則要轉換為紅黑樹。

put過程中遺留的問題:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 初始化的"功勞"被其他執行緒"搶去"了
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 一下,將 sizeCtl 設定為 -1,代表搶到了鎖
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // DEFAULT_CAPACITY 預設初始容量是 16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 初始化陣列,長度為 16 或初始化時提供的長度
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 將這個陣列賦值給 table,table 是 volatile 的
                    table = tab = nt;
                    // 如果 n 為 16 的話,那麼這裡 sc = 12
                    // 其實就是 0.75 * n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 設定 sizeCtl 為 sc,我們就當是 12 吧
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

初始化陣列:initTable,這個比較簡單,主要就是初始化一個合適大小的陣列,然後會設定 sizeCtl。初始化方法中的併發問題是通過對 sizeCtl 進行一個 CAS 操作來控制的。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        // MIN_TREEIFY_CAPACITY 為 64
        // 所以,如果陣列長度小於 64 的時候,其實也就是 32 或者 16 或者更小的時候,會進行陣列擴容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 後面我們再詳細分析這個方法
            tryPresize(n << 1);
        // b 是頭結點
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 加鎖
            synchronized (b) {
 
                if (tabAt(tab, index) == b) {
                    // 下面就是遍歷連結串列,建立一顆紅黑樹
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 將紅黑樹設定到陣列相應位置中
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

連結串列轉紅黑樹: treeifyBin,前面我們在 put 原始碼分析也說過,treeifyBin 不一定就會進行紅黑樹轉換,也可能是僅僅做陣列擴容。

// 首先要說明的是,方法引數 size 傳進來的時候就已經翻了倍了
private final void tryPresize(int size) {
    // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
 
        // 這個 if 分支和之前說的初始化陣列的程式碼基本上是一樣的,在這裡,我們可以不用管這塊程式碼
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2); // 0.75 * n
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            // 我沒看懂 rs 的真正含義是什麼,不過也關係不大
            int rs = resizeStamp(n);
 
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 2. 用 CAS 將 sizeCtl 加 1,然後執行 transfer 方法
                //    此時 nextTab 不為 null
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 1. 將 sizeCtl 設定為 (rs << RESIZE_STAMP_SHIFT) + 2)
            //     我是沒看懂這個值真正的意義是什麼?不過可以計算出來的是,結果是一個比較大的負數
            //  呼叫 transfer 方法,此時 nextTab 引數為 null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

擴容:tryPresize,如果說Java8 ConcurrentHashMap的原始碼不簡單,那麼說的就是擴容操作和遷移操作。這個方法要完完全全看懂還需要看之後的 transfer 方法,讀者應該提前知道這點。這裡的擴容也是做翻倍擴容的,擴容後陣列容量為原來的 2 倍。

這個方法的核心在於 sizeCtl 值的操作,首先將其設定為一個負數,然後執行transfer(tab, null),再下一個迴圈將sizeCtl加 1,並執行transfer(tab, nt),之後可能是繼續sizeCtl加1,並執行 transfer(tab, nt)。所以,可能的操作就是執行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這裡怎麼結束迴圈的需要看完 transfer 原始碼才清楚。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
 
    // stride 在單核下直接等於 n,多核模式下為 (n>>>3)/NCPU,最小值是 16
    // stride 可以理解為”步長“,有 n 個位置是需要進行遷移的,
    //   將這 n 個任務分為多個任務包,每個任務包有 stride 個任務
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
 
    // 如果 nextTab 為 null,先進行一次初始化
    //    前面我們說了,外圍會保證第一個發起遷移的執行緒呼叫此方法時,引數 nextTab 為 null
    //       之後參與遷移的執行緒呼叫此方法時,nextTab 不會為 null
    if (nextTab == null) {
        try {
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // nextTable 是 ConcurrentHashMap 中的屬性
        nextTable = nextTab;
        // transferIndex 也是 ConcurrentHashMap 的屬性,用於控制遷移的位置
        transferIndex = n;
    }
 
    int nextn = nextTab.length;
 
    // ForwardingNode 翻譯過來就是正在被遷移的 Node
    // 這個構造方法會生成一個Node,key、value 和 next 都為 null,關鍵是 hash 為 MOVED
    // 後面我們會看到,原陣列中位置 i 處的節點完成遷移工作後,
    //    就會將位置 i 處設定為這個 ForwardingNode,用來告訴其他執行緒該位置已經處理過了
    //    所以它其實相當於是一個標誌。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 
 
    // advance 指的是做完了一個位置的遷移工作,可以準備做下一個位置的了
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
 
    /*
     * 下面這個 for 迴圈,最難理解的在前面,而要看懂它們,應該先看懂後面的,然後再倒回來看
     * 
     */
 
    // i 是位置索引,bound 是邊界,注意是從後往前
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
 
        // 下面這個 while 真的是不好理解
        // advance 為 true 表示可以進行下一個位置的遷移了
        //   簡單理解結局:i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
 
            // 將 transferIndex 值賦給 nextIndex
            // 這裡 transferIndex 一旦小於等於 0,說明原陣列的所有位置都有相應的執行緒去處理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括號中的程式碼,nextBound 是這次遷移任務的邊界,注意,是從後往前
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                // 所有的遷移操作已經完成
                nextTable = null;
                // 將新的 nextTab 賦值給 table 屬性,完成遷移