1. 程式人生 > >原始碼分析-ConcurrentHashMap

原始碼分析-ConcurrentHashMap

文件

doc文件

一個支援併發的提取和修改的散列表。這個類和hashtable準守相同的規範,並且每個版本都對應相同的功能規範。雖然這是一個執行緒安全的類,但是他並不依賴於一個整體的鎖,沒有一個鎖可以鎖住整個元素而靜止所有訪問。這個類和hashtable可以互操作。

提取操作如get。不會阻塞,所以有可能被更新方法鎖覆蓋,提取反映了最近完全操作的結果。對於聚合操作如果putAll和clear,併發的提取可能只能反映一部分也結果(聚合操作不是原子性的)。相似的迭代器和列舉返回散列表基於某個點的狀態,或者迭代器和列舉建立時的狀態。他們不會丟擲
ConcurrentModificationException.然後迭代器被設計成沒成只能被一個執行緒訪問。

允許的併發數通過可選的concurrencyLevel來指定,預設值為16,內部單元的大小。散列表在內部分割槽使得執行指示數量的無爭用的更新操作。由於表內元素的放置是隨機的,因此併發的情況不可預測。理想情況下,你應該指定一個推薦的值允許多少個執行緒併發的訪問表。使用大的值會浪費空間和時間,小的值會導致執行緒爭用。但在一個數量級內的低估或者高估不會有非常明顯的差異。如果只有一個執行緒允許修改,而其他執行緒將只讀時,1是適當的值。調整散列表是一個非常慢的操作,所以儘可能預先估計給一個合適的大小傳遞給構造器。

這個表和他的檢視和它的迭代器實現了Map和Iterator的全部功能。

和HashTable一樣,不同於HashMap,這個類允許null作為鍵或者元素。

原始碼文件

基本的策略是將表分為幾個小的分割槽。每個分割槽是一個併發可讀的散列表。為了減小佔用空間,只構建一個分割槽,而其他的分割槽在需要的時候在構建。為了保持懶惰構造的可見性,所有的分割槽表必須是volatile訪問的。通過unsafe方法的segmnetAt實現。他提供了AtomicReferenceArrays的功能但是減少了中間的呼叫環節。此外,volatile寫元素和鍵值對的next域的所操作使用了更快捷的懶惰集合的寫模式。通過(putOrderedObject)因為寫總是需要釋放鎖的。這樣做維持了表序列的一致性更新。

歷史記錄:早起的實現嚴重依賴於final域,他避免了volatile的讀操作但是存在大量的空間佔用問題。一些遺留的設計用於相容序列化的可用性(比如強制構造分割槽0);

概述

總的來說ConcurrentHashMap就是有一系列的Segment

靜態域

    static final int DEFAULT_INITIAL_CAPACITY = 16;//預設初始大小
    static final float DEFAULT_LOAD_FACTOR = 0.75f;//預設裝填因子
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;//預設Segment數量
    static final int MAXIMUM_CAPACITY = 1 << 30;//預設表的最大容量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//Segment的最小數量
    static final int MAX_SEGMENTS = 1 << 16;//Segment的最大容量
    static final int RETRIES_BEFORE_LOCK = 2;//上鎖前Entry獲取次數,用於size和containsValue。

    private static final sun.misc.Unsafe UNSAFE;//以下的類都用於UNSAFE。這幾個寫的內容有幾個部分是出於序列化的時候的相容性需要。
    private static final long SBASE;
    private static final int SSHIFT;
    private static final long TBASE;
    private static final int TSHIFT;
    private static final long HASHSEED_OFFSET;
    private static final long SEGSHIFT_OFFSET;
    private static final long SEGMASK_OFFSET;
    private static final long SEGMENTS_OFFSET;

非靜態域

    final int segmentMask;//hashCode的高位來選擇應該雜湊到哪個Segment中。
    final int segmentShift;//hashCode的移位大小。
    final Segment<K,V>[] segments;//Segments 陣列

    transient Set<K> keySet;//以下為檢視項。
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

方法

構造器

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;//這個兩個引數使用用於計算當前的hashCode應該屬於哪一個segment。
        int ssize = 1;
        while (ssize < concurrencyLevel) {//ssize取當不小於設定值的2的階乘數。
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;//c用來計算segment中Entry[]的大小。
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)//同樣cap也是Segment中Entry[]的大小,也需要是2的階乘數。
            cap <<= 1;
        // create segments and segments[0]僅僅為了相容性討論。UNSAFE.putOrderedObject。也是出於相容性需要。
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

HashEntry

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);//putOrderedObject可以儲存檔案到指定位置,但是不提供可見性,如果要提供可見性需要對域用修飾符volatile修飾。
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

HashEntry整體上比較簡單,但是有兩點需要說明一下。

首先,HashEntry和HashMap中的Entry是不同的,HashEntry並不是Map中的Entry。這裡並沒有實現任何介面。因為Map中的entry功能是比較多的。而這裡的Entry只是提供了一個連結串列容器的功能。而且這樣的設計也比較好,Map中雖然有Entry的介面但是未必要去實現。

其次,域的可見性的問題。這裡這四個域都可以保持可見性,當然volatile的可見性是顯而易見的。final本身也是提供一定可見性的。這是final一個比較靈活運用的地方。比如同樣的set或者map中容器內的元素,如果元素的域是final的,則可以保證這個容器在存放這樣的元素時是執行緒安全的,即使容器本身沒有同步。因為同樣的final不可以被設定兩變。這個在之前《併發程式設計實戰》上說過。當然從語義上說這裡key和hashcode也不應該是可變的,而且對於Entry的訪問操作也進行了同步,因為不是所有的域都是final的

Segment

Segment原始碼文件

Segment會儲存表的Entry列表,並且會始終保持一致的狀態。所以通過對table域加上volatile的修飾符,read操作無鎖的進行讀取。在膨脹過程中,需要儲存重複的節點,以保證這些舊的列表可以被讀取者進行讀取。

這個類僅僅對於修改操作定義了鎖。除非特意註明,這個class方法執行per-segment 版本的的currentHashMap方法。其他方法被整合進ConcurrentHashMap方法。這些修改方法通過scanAndLock和scanAndLockForPut方法實現受控的自旋鎖。這個主要是為了減少快取未命中的情況(這在hashTable中是很常見的問題)。我們並不真的使用這樣的方式去找到node。node因為他們必須在有鎖的情況下獲得以確保更新的一致性。但是這樣做通常比進行重新定位要快的多。scanAndLockForput 建立了一個新的節點如果需要放置的節點沒有被找到的情況下。

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

這是標誌了在嘗試獲取鎖之前最小的後期數量。在scanAndLock和scanAndLockForput 中有用。具體在方法中說明。

        transient volatile HashEntry<K,V>[] table;
        transient int count;
        transient int modCount;
        transient int threshold;//table的的大小如果超過這個值就需要進行膨脹了。
        final float loadFactor;

put方法

首先來看put方法

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//雜湊的方式下面統一說。這裡只要知道是為了計算下標就好。
                HashEntry<K,V> first = entryAt(tab, index);//找到第一個Entry
                for (HashEntry<K,V> e = first;;) {//只要遍歷去找所有的Entry看是否有符合條件的。
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||//判斷是否符合當前的條件
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {//onlyIfAbsent表示只有在不存在的時候修改,為false就是可以修改已存在的值。
                                e.value = value;
                                ++modCount;
                            }
                            break;//退出迴圈
                        }
                        e = e.next;
                    }
                    else {//當遍歷之後沒有發現,則需要進行插入Entry的操作。
                        if (node != null)//node是在加鎖之前遍歷了節點沒有發現節點而新建了節點返回的,因此只要把這個檔期節點設定為首節點就好。注意這裡僅僅是設定了node的next,因為還沒有進行size的檢查。
                            node.setNext(first);
                        else//node為空說明直接獲得了鎖,沒有構造新的節點。所以需要先構造新的節點。
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)//判斷當前的table的大小是否需要進行膨脹。
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);設定table的首節點為noe
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

這裡還包括兩個小的方法:

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);//table中首節點的位置。
            HashEntry<K,V> e = first;//e為迭代器。
            HashEntry<K,V> node = null;//node為當沒有找到節點的時候需要返回的新構造的Entry。
            int retries = -1; // negative while locating node//當前重試的次數,當為-1的時候說明正在經歷定位元素的節點。當這個值增長到一定數值的時候,MAX_SCAN_RETRIES則說明當前列表比較穩定了,開始上鎖。
            while (!tryLock()) {//只要沒有獲得鎖就一直進行迴圈
                HashEntry<K,V> f; // to recheck first below//用來標記f節點是否是首節點。如果不是則說明出現不一致情況需要重新獲取f並重新進行掃描。
                if (retries < 0) {//retries為-1,表示當前還沒有實現過重試。
                    if (e == null) {//e為null說明則說明null已經迭代到尾部了。
                        if (node == null) // speculatively create node //需要建立新的元素。
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;//
                    }
                    else if (key.equals(e.key))//找到一個已存在的元素。也將重試次數置為0,開始等待當前列表是否為0。等待上鎖。
                        retries = 0;
                    else//找下一個元素
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {//在兩次重試的過程中連結串列沒有發生變化,說明比較穩定,開始上鎖。上鎖成功後退出迴圈。
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&// 再次查詢當前hash值對應的talbe的位置是否還是先前儲存的first。如果不是則說明列表被修改,重置引數。
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

rehash的文件
在一新的table中重分類節點,因為我們使用2的階層的拓展,所以每一個元素的位置要麼在原先的位置,要麼需要後移2的階乘倍。通過找到就節點可以重用的情況來避免clone的發生。統計意義上來說,在預設的的域值下,只有大約六分之一的節點需要克隆。替換的節點只要沒有任何讀取者的引用指向題目,則是可以被垃圾回收器回收的。(可能由於併發的遍歷操作),Entry的訪問使用陣列下標因為題目被volatile修飾。

        private void rehash(HashEntry<K,V> node) {

            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];//這裡有一個疑問問什麼要在這裡加上一個強制型別轉換。
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];//e代表了舊錶中的散列表table下標i位置的頭節點,當然如果是空的,則代表table的該位置沒有雜湊項。
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list//對於只有一個雜湊項的元素只要移動單獨的節點就好了。
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;//從這裡開始的11行。目的是為了找到連結串列結尾雜湊到相同新下標的節點。對於這部分元素進行整體移動,就免去了重新構造新節點的代價。
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {//這部分就是講除去最後一小段已經集體移動的節點之外,對於之前的所有節點進行復制放入新table的位置中。
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node//把需要新增的新元素新增到原表
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;//對table賦新的表;
        }

首先segment再雜湊或者說膨脹只是對單個Segment進行的。所以如果一個表進行多次變換後Segment中table的大小很可能是不一樣的。當然如果雜湊函式做的比較好應該各個Segment的大小應該是比較接近的。
其次對於原segment的中的原實際上是進行了複製,或者移動的,不過由於只有在put操作中才會進行再雜湊,而且put又是上了鎖的,所以可以保證程式被正確同步,但是對於讀操作得到的引用未必是當前的引用了。這一點需要注意。
最後,這中間找連結串列的最後一段進行復制的操作。這個是從統計意義上說更效率。

remove

final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);//如果上鎖識別,則掃描鏈table中的hash表以保持快取命中。
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//計算下標
                HashEntry<K,V> e = entryAt(tab, index);//e為tab[index]的連結串列首節點
                HashEntry<K,V> pred = null;//為單項鍊表的某節點的前驅節點
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||//如果如果找到對映的節點則準備進行設定。
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {//這裡不太明白為什麼會有value為空的情況Concurrent並不執行value為null。然後進行刪除,需要考慮e是頭結點的情況
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

這裡比較常見的判斷方法是先用key==號判斷,也就是兩者是同一個節點,或者使用key.equals和hashcode來判斷。這樣比較全面,效率兼顧安全。大部分的hash方面的問題都是這樣做的。

此外我其實還有一點不是很明確。就是關於volatile的修飾後記憶體可見性的問題。當然volatile原理上說是記憶體快取禁止標誌位。但是從這裡來看似乎並不完全是這樣,或者我原先的理解有一定問題。我本來理解就是既然沒有快取,就每次讀取直接從記憶體中讀取資料了。
但是從這裡來看scanAndLock沒有任何影響,主要的目的就是在於使快取重新整理,也就是說不停的從記憶體中讀取。但是上鎖之後實際上還是進行了讀操作的,那上鎖之後的讀操作到底是從快取中讀取的還是從記憶體中讀取的呢?我理解應該是記憶體。如果是記憶體那這樣掃描還有什麼意義呢。。到目前為止我還是不是很理解。But this code come from Doug Lea。所以大概還是我哪裡理解的有問題。。

        private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null || key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -1;
                }
            }
        }

雜湊過程

  1. hashSeed的產生過程:

hashSeed的產生過程基本上和HashMap中的是類似的是需要根據sun.misc.Hashing.randomHashSeed(instance)產生的,這需要虛擬機器產生啟動後才能使用,所以使用一個靜態類來初始化。此外這裡還有一個是可以通過jdk.map.althashing.threshold來控制的域值需要使用一個靜態的Holder類來進行處理。之前在分析HashMap的時候沒有詳細說,這裡盡我所能說明白怎麼去控制這些引數以及他們發揮作用的原理。對於String需要單獨產生雜湊種子。

我們從後往前說,首先產生雜湊種子需要的是一個隨機數。這個隨機數是由randomHashSeed針對每個ConcurrentHashMap的例項產生的。這裡分兩種情況,預設情況下返回0,如果設定了ALTERNATIVE_HASHING(並且虛擬機器已經啟動,則會使用sun.misc.Hashing.randomHashSeed來產生一個隨機數)。這裡為什麼不直接產生,是為了給使用者提供一些自定義的功能,在虛擬機器啟動的時候可以通過設定一些引數來控制雜湊種子的產生。這是java的慣用手法凡是根hash產生有關的部分都使用了類似的方式。

    private transient final int hashSeed = randomHashSeed(this);

    private static int randomHashSeed(ConcurrentHashMap instance) {
        if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
            return sun.misc.Hashing.randomHashSeed(instance);
        }

        return 0;
    }

其次這裡的ALTERNATIVE_HASHING是根據什麼產生的。Holder靜態內部了用於產生這個ALTERNATIVE_HASHING。

不同的HashMap中使用的ALTERNATIVE_HASHING的方法都有些不同,這裡主要是控制設定的如果閾值,如果閾值過小則需要隨機產生雜湊種子。所以通常情況下都會對String 做單獨處理。如果在啟動jvm的時候設定了jdk.map.althashing.threshold,Holder會比較jdk.map.althashing.threshold與MAXIMUM_CAPACITY的大小。如果他小於ConcurrentHashMap的最大值MAXIMUM_CAPACITY,則為ALTERNATIVE_HASHINGtrue。至於為什麼這樣做我現在還沒法回答。如果不設定則預設為Integer.MAX_VALUE,也就是不需要設定。換句話說,如果設定的域值小於java的ConcurrentHashMap的最大值MAXIMUM_CAPACITY則需要重新計算雜湊種子。

    private static class Holder {

        /**
        * Enable alternative hashing of String keys?
        *
        * <p>Unlike the other hash map implementations we do not implement a
        * threshold for regulating whether alternative hashing is used for
        * String keys. Alternative hashing is either enabled for all instances
        * or disabled for all instances.
        */
        static final boolean ALTERNATIVE_HASHING;

        static {
            // Use the "threshold" system property even though our threshold
            // behaviour is "ON" or "OFF".
            String altThreshold = java.security.AccessController.doPrivileged(
                new sun.security.action.GetPropertyAction(
                    "jdk.map.althashing.threshold"));

            int threshold;
            try {
                threshold = (null != altThreshold)
                        ? Integer.parseInt(altThreshold)
                        : Integer.MAX_VALUE;

                // disable alternative hashing if -1
                if (threshold == -1) {
                    threshold = Integer.MAX_VALUE;
                }

                if (threshold < 0) {
                    throw new IllegalArgumentException("value must be positive integer.");
                }
            } catch(IllegalArgumentException failed) {
                throw new Error("Illegal value for 'jdk.map.althashing.threshold'", failed);
            }
            ALTERNATIVE_HASHING = threshold <= MAXIMUM_CAPACITY;
        }
    }
  1. hashCode的產生過程:

ConcurrentHashMap的產生過程和HashMap中是有所不同的,因為ConcurrentHashM實際上是需要根據hashCode的位來計算其屬於哪個Segment的。所以對每一位的雜湊都需要進行精細的控制。中間這一系列的位移操作主要目的就是使得雜湊效果被平均的分配到各個位上。因為如果是直接用雜湊的話也許低位不一樣,但是高位是相似的,這樣使用高位去定位Segment的時候回把大量的Entry定位到一個Segment中,併發效能降低。

此外對於String需要單獨的產生隨機數種子,這是String的特性決定的。

    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }
  1. 定位到Segment

Segment是不同的預設情況下Segment通過下面的式子計算出來的,預設情況下segmentShift為28,segmentMask為15,也就是說僅僅使用高四位來進行雜湊到16個Segment中。

int j = (hash >>> segmentShift) & segmentMask;

方法

put

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;// 計算雜湊到那個Segment。
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck//獲得Segment,但是這裡不太明白工作原理。為何要多出這三行,因為是final類的程式所以這裡應該是沒有可見性問題才對。而且很奇怪的是在在remove中就沒有這部分,而是直接使用SegmentForHash
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);//確定了segment之後會把任務委託給Segment的put來做。這部分之前說過這裡不詳細說明了。
    }

不過這裡UNSAFE的問題還是有一些需要說明。UNSAFE理解的不太透徹嘗試解釋一下,有問題以後再修改。
首先,UNSAFE獲得的偏移量、首地址、或者增量地址都是針對一個具體的*.class來說的。這些變數都是固定的,可能對於不同的虛擬機器會有所不同,但是一旦獲得之後就是常量了。但是如果在使用這些常量進行獲得、交換或者替換等原子性操作的時的時候需要繫結具體的例項。

首先看下這幾個UNSAFE獲得常量:

    static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            TBASE = UNSAFE.arrayBaseOffset(tc);//獲得是該類的首地址
            SBASE = UNSAFE.arrayBaseOffset(sc);
            ts = UNSAFE.arrayIndexScale(tc);//獲得是該類的增量地址
            ss = UNSAFE.arrayIndexScale(sc);
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(//獲得的是域的偏移地址
                ConcurrentHashMap.class.getDeclaredField("hashSeed"));
            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentShift"));
            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentMask"));
            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segments"));
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)//這裡主要是判斷Segments和table是不是2的指數。
            throw new Error("data type scale not a power of two");
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);//這個計算的是偏移量。實際上在後續的計算中使用SSHIFT而不使用ss,因為位操作速度更快。這樣就需要保證但是這裡我不明的一點是既然是根據單獨的類生成了那應該是保證是保證是固定的值為何還要判斷是否是2的階乘。此外這裡為何一定要求是2的階乘。總之問題多多。主要還是java記憶體怎麼設定的不太理解。ConcurrentHashMap中大量運用UNSAFE來設定和獲取,不太明白這樣做和直接讀取有什麼區別。
        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
    }

remove、replace

兩者方法類似,就是定位Segment然後委託給Segment來操作就好。

    public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }

迭代器

迭代器的內容都差不多。因為是map所以有很多衍生的類,這裡只看下最主要的,因為實現相對比較簡單,我也沒有看到非常規的問題。

    abstract class HashIterator {
        int nextSegmentIndex;
        int nextTableIndex;
        HashEntry<K,V>[] currentTable;
        HashEntry<K, V> nextEntry;
        HashEntry<K, V> lastReturned;

        HashIterator() {
            nextSegmentIndex = segments.length - 1;
            nextTableIndex = -1;
            advance();
        }

        /**
         * Set nextEntry to first node of next non-empty table
         * (in backwards order, to simplify checks).
         */
        final void advance() {//advance的任務是將節點設定到下一個連結串列的頭結點,這裡有兩種情況一種是當前table[]中的某個連結串列遍歷完,也有可能是Segment中的所有連結串列遍歷完畢,所以這裡要兩種情況討論。Map總的來說是從後往前遍歷的。每次切換Segment都要切換nextTableIndex 為下一個talbe的長度。當所有Segment遍歷完畢也就結束遍歷了。
            for (;;) {
                if (nextTableIndex >= 0) {
                    if ((nextEntry = entryAt(currentTable,
                                             nextTableIndex--)) != null)
                        break;
                }
                else if (nextSegmentIndex >= 0) {
                    Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
                    if (seg != null && (currentTable = seg.table) != null)
                        nextTableIndex = currentTable.length - 1;
                }
                else
                    break;
            }
        }

        final HashEntry<K,V> nextEntry() {//nextEntry會依照當前節點向後移動,直到移動到null說明當前的連結串列遍歷完成需要移動到下一個連結串列的頭結點這個任務由advance完成。
            HashEntry<K,V> e = nextEntry;
            if (e == null)
                throw new NoSuchElementException();
            lastReturned = e; // cannot assign until after null check
            if ((nextEntry = e.next) == null)
                advance();
            return e;
        }

其他

檢視比較麻煩也沒有特別要說的這裡不詳細說了。
序列化和之前的相容性有關也比較麻煩。以後有機會再說吧。