1. 程式人生 > 實用技巧 >3.5.3 資料排序;重複數值、缺失值處理

3.5.3 資料排序;重複數值、缺失值處理

1、它實現了ConcurrentMap介面,該介面定義了一些原子操作約定
2、執行緒安全

  • 完全的併發讀和高併發寫
  • 讀操作完全無鎖,犧牲了一致性;寫操作部分有鎖
  • 它與HashTableCollections.synchronizedMap
  • HashMap支援nullConcurrentHashMapHashTable不支援null

3、java7

  • 分段鎖
  • 雜湊表/連結串列

4、java8

  • CAS + Unsafe
  • 雜湊表/連結串列 + 紅黑樹

java7的實現

一、相關概念

1、分段鎖

ConcurrentHashMap底層採用多個分段Segment,每段下面都是一個雜湊表,這就是分段。每當需要對每段資料上鎖操作時,只需要對Segment

上鎖即可,這就是分段鎖。通常稱Segment的數量叫做併發度concurrency
優點:

  • 在上鎖的情況下,提高了併發度;

2、併發度concurrency

    /**
     * The default concurrency level for this table, used when not
     * otherwise specified in a constructor.
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

這表示預設情況下,會有16個段Segment

3、每個Segment的雜湊表長度都是2的冪次方

ConcurrentHashMap構造方法中

二、原始碼分析

1、get方法

  • 計算segment的位置
  • 找到這個段下面的雜湊表
  • 遍歷連結串列,看是否存在
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        // 獲取到key所在Segment陣列的下標
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        // 判斷這個下標是否存在,以及Segment下面的雜湊表是否存在
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            // 熟悉的:(tab.length - 1) & h操作
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

(1)為什麼要使用UNSAFE.getObjectVolatile(segments, u)這種方式來讀取陣列下標的某個元素?
提高效能。使用常用segments[i]這種語法,在編譯位元組碼的時候,是會檢查陣列是否越界;而使用上面的程式碼,會節省這一步。

(2)如何保證執行緒安全性?
即如何保證在多執行緒環境下,當執行緒在做更新操作時,如果其他執行緒在同步讀的話,是可能出現髒資料、空指標情況。那麼ConcurrentHashMap是如何保證的?
ConcurrentHashMap為了提高高併發,而犧牲了一致性,但這種一致性是弱一致性,不會對程式造成大的過錯。所以髒資料是無法避免的,因此在java8的類註釋寫到不建議使用sizeisEmptycontainsValue來進行判斷語句。

 * Bear in mind that the results of aggregate status methods including
 * {@code size}, {@code isEmpty}, and {@code containsValue} are typically
 * useful only when a map is not undergoing concurrent updates in other threads.
 * Otherwise the results of these methods reflect transient states
 * that may be adequate for monitoring or estimation purposes, but not
 * for program control.

2、put方法

  • 找到Segment,必要時新建;
  • Segment執行put操作,必要時擴容;
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

(1)擴容時如何保證執行緒安全性?

  • 在建立Segment時,採用CAS保證執行緒安全性;
  • 在建立Entry時,因為Segment本身就是ReentrantLock,在其Segment.put()方法是一定保證在獲取到鎖的情況下才執行操作的;

(2)Unsafe.getObject()的作用?

java8的實現

一、與java7的改進

使用雜湊表 + 連結串列/紅黑樹 的資料結構

放棄使用分段鎖,改用CASvolatileUnsafe

java7的分段鎖很好,但鎖畢竟還是很慢的,所以java8實現了儘可能地無鎖環境。

這裡所說地無鎖也僅僅大多數情況下,在某些特殊場景還是需要鎖地。

鎖的粒度更細

java7鎖地粒度是Segment,而在java8中鎖地粒度是每個Entry

二、原始碼分析

1、get方法

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 重新hash
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 如果第一個就找到,直接返回
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 如果元素地hash值小於0,就往紅黑樹查詢
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // 連結串列下地查詢
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

(1)查詢沒有鎖,如何有人在寫入怎麼辦?

  • 在紅黑樹狀態下,查詢是有讀寫鎖;
  • 在連結串列狀態下,跟java7相似,犧牲了弱一致性;

(2)紅黑樹是怎麼找的?

2、put方法

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // 重新hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 自旋操作:樂觀鎖
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果雜湊表為空,就新建
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 找到對應下標Entry,如果為空,就新建
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果當前節點處於轉發節點,即正處於擴容轉移狀態,就幫忙一起轉移
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 在對應Entry下,進行put操作
            else {
                V oldVal = null;
                // synchronized鎖定entry,進行put
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 連結串列地put操作
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 紅黑樹地put操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 檢查是否需要將連結串列轉換成紅黑樹
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 記錄數量,必要地時候進行擴容
        addCount(1L, binCount);
        return null;
    }

(1)在哪裡擴容的?

(2)擴容是如何進行地?