1. 程式人生 > >Java併發程式設計系列-(5) Java併發容器

Java併發程式設計系列-(5) Java併發容器

5 併發容器

5.1 Hashtable、HashMap、TreeMap、HashSet、LinkedHashMap

在介紹併發容器之前,先分析下普通的容器,以及相應的實現,方便後續的對比。

Hashtable、HashMap、TreeMap 都是最常見的一些 Map 實現,是以鍵值對的形式儲存和操作資料的容器型別。

Hashtable 是早期 Java 類庫提供的一個雜湊表實現,本身是同步的,不支援 null 鍵和值,由於同步導致的效能開銷,所以已經很少被推薦使用。

HashMap 是應用更加廣泛的雜湊表實現,行為上大致上與 HashTable 一致,主要區別在於 HashMap 不是同步的,支援 null 鍵和值等。通常情況下,HashMap 進行 put 或者 get 操作,可以達到常數時間的效能,所以它是絕大部分利用鍵值對存取場景的首選,比如,實現一個使用者 ID 和使用者資訊對應的執行時儲存結構。

HashMap 明確宣告不是執行緒安全的資料結構,如果忽略這一點,簡單用在多執行緒場景裡,難免會出現問題,如 HashMap 在併發環境可能出現無限迴圈佔用 CPU、size 不準確等詭異的問題。

TreeMap 則是基於紅黑樹的一種提供順序訪問的 Map,和 HashMap 不同,它的 get、put、remove 之類操作都是 O(log(n))的時間複雜度,具體順序可以由指定的 Comparator 來決定,或者根據鍵的自然順序來判斷。

Hashtable

Hashtable是通過"拉鍊法"實現的雜湊表,結構如下圖所示:

1. 定義

public class Hashtable<K,V>  
    extends Dictionary<K,V>  
    implements Map<K,V>, Cloneable, java.io.Serializable{}

Hashtable 繼承於 Dictionary 類,實現了 Map, Cloneable, java.io.Serializable介面。

2. 構造方法

Hashtable 一共提供了 4 個構造方法:

public Hashtable(int initialCapacity, float loadFactor): 用指定初始容量和指定負載因子構造一個新的空雜湊表。
public Hashtable(int initialCapacity):用指定初始容量和預設的負載因子 (0.75) 構造一個新的空雜湊表。
public Hashtable():預設建構函式,容量為 11,負載因子為 0.75。
- public Hashtable(Map<? extends K, ? extends V> t):構造一個與給定的 Map 具有相同對映關係的新雜湊表。

它包括幾個重要的成員變數:table, count, threshold, loadFactor, modCount。

  • table 是一個 Entry[] 陣列型別,而 Entry實際上就是如上圖所示的一個單向連結串列。Hashtable的鍵值對都是儲存在Entry陣列中的。
  • count 是 Hashtable 的大小,它是 Hashtable 儲存的鍵值對的數量。
  • threshold 是 Hashtable 的閾值,用於判斷是否需要調整 Hashtable 的容量。threshold 的值="容量 x 負載因子"。
  • loadFactor 就是負載因子。
  • modCount 記錄hashTable被修改的次數,在對HashTable的操作中,無論add、remove、clear方法只要是涉及了改變Table陣列元素的個數的方法都會導致modCount的改變。這主要用來實現“快速失敗”也就是fail-fast,它是Java集合的一種錯誤檢測機制。

fail-fast機制舉例:有兩個執行緒(執行緒A,執行緒B),其中執行緒A負責遍歷list、執行緒B修改list。執行緒A在遍歷list過程的某個時候(此時expectedModCount = modCount=N),執行緒啟動,同時執行緒B增加一個元素,這是modCount的值發生改變(modCount + 1 = N + 1)。執行緒A繼續遍歷執行next方法時,通告checkForComodification方法發現expectedModCount  = N  ,而modCount = N + 1,兩者不等,這時就丟擲ConcurrentModificationException 異常,從而產生fail-fast機制。

3. PUT操作

put 方法的整個流程為:

  • 判斷 value 是否為空,為空則丟擲異常;
  • 計算 key 的 hash 值,並根據 hash 值獲得 key 在 table 陣列中的位置 index,如果 table[index] 元素不為空,則進行迭代,如果遇到相同的 key,則直接替換,並返回舊 value;
  • 否則,我們可以將其插入到 table[index] 位置。
public synchronized V put(K key, V value) {
        // Make sure the value is not null確保value不為null
        if (value == null) {
            throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        //確保key不在hashtable中
        //首先,通過hash方法計算key的雜湊值,並計算得出index值,確定其在table[]中的位置
        //其次,迭代index索引位置的連結串列,如果該位置處的連結串列存在相同的key,則替換value,返回舊的value
        Entry tab[] = table;
        int hash = hash(key);
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                V old = e.value;
                e.value = value;
                return old;
            }
        }

        modCount++;
        if (count >= threshold) {
            // Rehash the table if the threshold is exceeded
            //如果超過閥值,就進行rehash操作
            rehash();

            tab = table;
            hash = hash(key);
            index = (hash & 0x7FFFFFFF) % tab.length;
        }

        // Creates the new entry.
        //將值插入,返回的為null
        Entry<K,V> e = tab[index];
        // 建立新的Entry節點,並將新的Entry插入Hashtable的index位置,並設定e為新的Entry的下一個元素
        tab[index] = new Entry<>(hash, key, value, e);
        count++;
        return null;
    }

4. Get操作

首先通過 hash()方法求得 key 的雜湊值,然後根據 hash 值得到 index 索引。然後迭代連結串列,返回匹配的 key 的對應的 value;找不到則返回 null。

public synchronized V get(Object key) {
        Entry tab[] = table;
        int hash = hash(key);
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                return e.value;
            }
        }
        return null;
    }

5. rehash擴容

  • 陣列長度增加一倍(如果超過上限,則設定成上限值)。
  • 更新雜湊表的擴容門限值。
  • 遍歷舊錶中的節點,計算在新表中的index,插入到對應位置連結串列的頭節點。
    protected void rehash() {
        int oldCapacity = table.length;
        Entry<?,?>[] oldMap = table;

        // overflow-conscious code
        int newCapacity = (oldCapacity << 1) + 1;
        if (newCapacity - MAX_ARRAY_SIZE > 0) {
            if (oldCapacity == MAX_ARRAY_SIZE)
                // Keep running with MAX_ARRAY_SIZE buckets
                return;
            newCapacity = MAX_ARRAY_SIZE;
        }
        Entry<?,?>[] newMap = new Entry<?,?>[newCapacity];

        modCount++;
        threshold = (int)Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1);
        table = newMap;

        for (int i = oldCapacity ; i-- > 0 ;) {
            for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) {
                Entry<K,V> e = old;
                old = old.next;

                int index = (e.hash & 0x7FFFFFFF) % newCapacity;
                e.next = (Entry<K,V>)newMap[index];
                newMap[index] = e;
            }
        }
    }

6. Remove方法

remove方法主要邏輯如下:

  • 先獲取synchronized鎖。
  • 計算key的雜湊值和index。
  • 遍歷對應位置的連結串列,尋找待刪除節點,如果存在,用e表示待刪除節點,pre表示前驅節點。如果不存在,返回null。
  • 更新前驅節點的next,指向e的next。返回待刪除節點的value值。

Hash值的不同實現:JDK7 Vs JDK8

以上給出的程式碼均為jdk7中的實現,注意到在jdk7和8裡面,關於元素hash值的計算方法是不一樣的。

  • 在JDK7中,hashtable專門實現了hash函式,在以上的例子中都有看到,具體的實現如下:
//利用異或,移位等運算,對key的hashcode進一步進行計算以及二進位制位的調整等來保證最終獲取的儲存位置儘量分佈均勻
final int hash(Object k) {
        int h = hashSeed;
        if (0 != h && k instanceof String) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }

以上hash函式計算出的值,通過indexFor進一步處理來獲取實際的儲存位置

    //返回陣列下標
    static int indexFor(int h, int length) {
        return h & (length-1);
    }
  • 在jdk8裡面,直接呼叫key.hashCode()來獲取key的hash值,接著在保證hash值為正數的前提下,得到相應的下標,
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;

注意到都使用到了hashCode,這個方法是在Object方法中定義的,

    @HotSpotIntrinsicCandidate
    public native int hashCode();

可以看到是Object裡沒有給出hashCode的實現,只是宣告為一個native方法,說明Java會去呼叫本地C/C++對hashcode的具體實現。

在JDK8及以後,可以通過如下指令來獲取到所有的hash演算法,

java -XX:+PrintFlagsFinal | grep hashCode

具體大概有如下幾種,第5個演算法是預設使用的,用到了異或操作和一些偏移演算法來生成hash值。

0 == Lehmer random number generator,
1 == "somehow" based on memory address
2 == always 1
3 == increment counter
4 == memory based again ("somehow")
5 == Marsaglia XOR-Shift algorithm, that has nothing to do with memory.

HashTable相對於HashMap的最大特點就是執行緒安全,所有的操作都是被synchronized鎖保護的


參考:

  • https://www.imooc.com/article/23015
  • https://wiki.jikexueyuan.com/project/java-collection/hashtable.html
  • https://stackoverflow.com/questions/49172698/default-hashcode-implementation-for-java-objects

HashMap

HashMap是java中使用最為頻繁的map型別,其讀寫效率較高,但是因為其是非同步的,即讀寫等操作都是沒有鎖保護的,所以在多執行緒場景下是不安全的,容易出現數據不一致的問題。

HashMap的結構和HashTable一致,都是使用是由陣列和連結串列兩種資料結構組合而成的,不同的是在JDK8裡面引入了紅黑樹,當連結串列長度大於8時,會將連結串列轉換為紅黑樹。

HashMap的成員變數和HashTable一樣,在進行初始化的時候,都會設定一個容量值(capacity)和載入因子(loadFactor)。

  • 容量值指的並不是表的真實長度,而是使用者預估的一個值,真實的表長度,是不小於capacity的2的整數次冪。
  • 載入因子是為了計算雜湊表的擴容門限,如果雜湊表儲存的節點數量達到了擴容門限,雜湊表就會進行擴容的操作,擴容的數量為原表數量的2倍。預設情況下,capacity的值為16,loadFactor的值為0.75(綜合考慮效率與空間後的折衷)

HashMap的核心建構函式如下,主要是設定負載因子,以及根據使用者的設定容量,找到一個不小於該容量的閾值。

    public HashMap(int initialCapacity, float loadFactor) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException("Illegal initial capacity: " +
                                               initialCapacity);
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        if (loadFactor <= 0 || Float.isNaN(loadFactor))
            throw new IllegalArgumentException("Illegal load factor: " +
                                               loadFactor);
        this.loadFactor = loadFactor;
        this.threshold = tableSizeFor(initialCapacity);
    }

由於HashMap和HashTable有實現上有諸多相似之處,這裡會重點介紹hashMap在jdk7和8中的不同實現。

Hash運算

不管增加、刪除、查詢鍵值對,定位到雜湊桶陣列的位置都是很關鍵的第一步。都需要用到hash演算法,jdk7和8中的演算法基本一致,具體實現如下:

static final int hash(Object key) {   //jdk1.8 & jdk1.7
     int h;
     // h = key.hashCode() 為第一步 取hashCode值
     // h ^ (h >>> 16)  為第二步 高位參與運算
     return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

然後利用得到的hash值與陣列長度取模,得到相應的index。

以下圖示例項,給出了計算過程,

Get操作

    public V get(Object key) {
        Node<K,V> e;
        return (e = getNode(hash(key), key)) == null ? null : e.value;
    }

    /**
     * Implements Map.get and related methods
     *
     * @param hash hash for key
     * @param key the key
     * @return the node, or null if none
     */
    final Node<K,V> getNode(int hash, Object key) {
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (first = tab[(n - 1) & hash]) != null) {
            if (first.hash == hash && // always check first node
                ((k = first.key) == key || (key != null && key.equals(k))))
                return first;
            if ((e = first.next) != null) {
                if (first instanceof TreeNode)
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                do {
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        return e;
                } while ((e = e.next) != null);
            }
        }
        return null;
    }

Get操作比較簡單:

  • 先定位到陣列中index位置,檢查第一個節點是否滿足要求 
  • 遍歷對應該位置的連結串列,找到滿足要求節點進行return

PUT操作

PUT操作的執行過程如下:

①.判斷鍵值對陣列table[i]是否為空或為null,否則執行resize()進行擴容;

②.根據鍵值key計算hash值得到插入的陣列索引i,如果table[i]==null,直接新建節點新增,轉向⑥,如果table[i]不為空,轉向③;

③.判斷table[i]的首個元素是否和key一樣,如果相同直接覆蓋value,否則轉向④,這裡的相同指的是hashCode以及equals;

④.判斷table[i] 是否為treeNode,即table[i] 是否是紅黑樹,如果是紅黑樹,則直接在樹中插入鍵值對,否則轉向⑤;

⑤.遍歷table[i],判斷連結串列長度是否大於8,大於8的話把連結串列轉換為紅黑樹,在紅黑樹中執行插入操作,否則進行連結串列的插入操作;遍歷過程中若發現key已經存在直接覆蓋value即可;

⑥.插入成功後,判斷實際存在的鍵值對數量size是否超多了最大容量threshold,如果超過,進行擴容。

 1 public V put(K key, V value) {
 2     // 對key的hashCode()做hash
 3     return putVal(hash(key), key, value, false, true);
 4 }
 5 
 6 final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
 7                boolean evict) {
 8     Node<K,V>[] tab; Node<K,V> p; int n, i;
 9     // 步驟①:tab為空則建立
10     if ((tab = table) == null || (n = tab.length) == 0)
11         n = (tab = resize()).length;
12     // 步驟②:計算index,並對null做處理 
13     if ((p = tab[i = (n - 1) & hash]) == null) 
14         tab[i] = newNode(hash, key, value, null);
15     else {
16         Node<K,V> e; K k;
17         // 步驟③:節點key存在,直接覆蓋value
18         if (p.hash == hash &&
19             ((k = p.key) == key || (key != null && key.equals(k))))
20             e = p;
21         // 步驟④:判斷該鏈為紅黑樹
22         else if (p instanceof TreeNode)
23             e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
24         // 步驟⑤:該鏈為連結串列
25         else {
26             for (int binCount = 0; ; ++binCount) {
27                 if ((e = p.next) == null) {
28                     p.next = newNode(hash, key,value,null);
                        //連結串列長度大於8轉換為紅黑樹進行處理
29                     if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st  
30                         treeifyBin(tab, hash);
31                     break;
32                 }
                    // key已經存在直接覆蓋value
33                 if (e.hash == hash &&
34                     ((k = e.key) == key || (key != null && key.equals(k)))) 
35                          break;
36                 p = e;
37             }
38         }
39         
40         if (e != null) { // existing mapping for key
41             V oldValue = e.value;
42             if (!onlyIfAbsent || oldValue == null)
43                 e.value = value;
44             afterNodeAccess(e);
45             return oldValue;
46         }
47     }

48     ++modCount;
49     // 步驟⑥:超過最大容量 就擴容
50     if (++size > threshold)
51         resize();
52     afterNodeInsertion(evict);
53     return null;
54 }

Resize擴容操作

由於JDK8引入了紅黑樹,所以在實現上JDK7和8的resize過程不太一致。

首先是JDK7的實現,

 1 void resize(int newCapacity) {   //傳入新的容量
 2     Entry[] oldTable = table;    //引用擴容前的Entry陣列
 3     int oldCapacity = oldTable.length;         
 4     if (oldCapacity == MAXIMUM_CAPACITY) {  //擴容前的陣列大小如果已經達到最大(2^30)了
 5         threshold = Integer.MAX_VALUE; //修改閾值為int的最大值(2^31-1),這樣以後就不會擴容了
 6         return;
 7     }
 8  
 9     Entry[] newTable = new Entry[newCapacity];  //初始化一個新的Entry陣列
10     transfer(newTable);                         //!!將資料轉移到新的Entry數組裡
11     table = newTable;                           //HashMap的table屬性引用新的Entry陣列
12     threshold = (int)(newCapacity * loadFactor);//修改閾值
13 }

這裡就是使用一個容量更大的陣列來代替已有的容量小的陣列,transfer()方法將原有Entry陣列的元素拷貝到新的Entry數組裡。

 1 void transfer(Entry[] newTable) {
 2     Entry[] src = table;                   //src引用了舊的Entry陣列
 3     int newCapacity = newTable.length;
 4     for (int j = 0; j < src.length; j++) { //遍歷舊的Entry陣列
 5         Entry<K,V> e = src[j];             //取得舊Entry陣列的每個元素
 6         if (e != null) {
 7             src[j] = null;//釋放舊Entry陣列的物件引用(for迴圈後,舊的Entry陣列不再引用任何物件)
 8             do {
 9                 Entry<K,V> next = e.next;
10                 int i = indexFor(e.hash, newCapacity); //!!重新計算每個元素在陣列中的位置
11                 e.next = newTable[i]; //標記[1]
12                 newTable[i] = e;      //將元素放在陣列上
13                 e = next;             //訪問下一個Entry鏈上的元素
14             } while (e != null);
15         }
16     }
17 } 

newTable[i]的引用賦給了e.next,也就是使用了單鏈表的頭插入方式,同一位置上新元素總會被放在連結串列的頭部位置;這樣先放在一個索引上的元素終會被放到Entry鏈的尾部(如果發生了hash衝突的話),這一點和Jdk1.8有區別。

具體舉例如下圖所示:

接下來是JDK8中的實現,

    /**
     * Initializes or doubles table size.  If null, allocates in
     * accord with initial capacity target held in field threshold.
     * Otherwise, because we are using power-of-two expansion, the
     * elements from each bin must either stay at same index, or move
     * with a power of two offset in the new table.
     *
     * @return the table
     */
    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table;
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        if (oldCap > 0) {
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
        }
        else if (oldThr > 0) // initial capacity was placed in threshold
            newCap = oldThr;
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        if (oldTab != null) {
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    if (e.next == null)
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }

由於Size會進行2次冪的擴充套件(指長度擴為原來2倍),所以,元素的位置要麼是在原位置,要麼是在原位置再移動2次冪的位置。通過下面的例子,可以清楚的看到,21和5在原來的陣列中都處於相同的位置,但是在新的陣列中,21到了新的位置,位置為原來的位置加上16,也就是舊的Capacity;但是5還在原來的位置。

假定我們在Size變為2倍以後,重新計算hash,因為n變為2倍,相應的n-1的mask範圍在高位多1bit(紅色),也就是與上面示意圖中紅色部分對應的那一位,如果那位是1,則需要移動到新的位置,否則不變。

回到程式碼實現中,直接用舊的hash值與上oldCapacity,因為舊的capacity是2的倍數(二進位制為00000...1000),而且獲取舊index的時候採用hash&(oldCap-1),所以直接e.hash & oldCap就是判斷新增加的高位是否為1,為1則需要移動,否則保持不變。

if ((e.hash & oldCap) == 0)

這種巧妙的方法,同時由於高位的1和0隨機出現,保證了resize之後元素分佈的離散性。

下圖是這一過程的模擬,

JDK8中的紅黑樹

引入紅黑樹主要是為了保證在hash分佈極不均勻的情況下的效能,當一個連結串列太長(大於8)的時候,通過動態的將它替換成一個紅黑樹,這話的話會將時間複雜度從O(n)降為O(logn)。

為什麼HashMap的陣列長度一定保持2的次冪?

  1. 從上面的分析JDK8 resize的過程可以可能到,陣列長度保持2的次冪,當resize的時候,為了通過h&(length-1)計算新的元素位置,可以看到當擴容後只有一位差異,也就是多出了最左位的1,這樣計算 h&(length-1)的時候,只要h對應的最左邊的那一個差異位為0,就能保證得到的新的陣列索引和老陣列索引一致,否則index+OldCap。

  1. 陣列長度保持2的次冪,length-1的低位都為1,會使得獲得的陣列索引index更加均勻。hash函式採用各種位運算也是為了使得低位更加雜湊,如果低位全部為1,那麼對於h低位部分來說,任何一位的變化都會對結果產生影響,可以儘可能的使元素分佈比較均勻。

HashMap Vs HashTable

  • HashMap允許將 null 作為一個 entry 的 key 或者 value,而 Hashtable 不允許。
  • HashTable 繼承自 Dictionary 類,而 HashMap 是 Java1.2 引進的 Map interface 的一個實現。
  • HashTable 的方法是 Synchronized 的,而 HashMap 不是,在多個執行緒訪問 Hashtable 時,不需要自己為它的方法實現同步,而 HashMap 就必須為之提供外同步。

參考:

  • https://tech.meituan.com/2016/06/24/java-hashmap.html
  • https://juejin.im/post/5aa5d8d26fb9a028d2079264
  • https://my.oschina.net/hosee/blog/618953
  • https://www.imooc.com/article/22943
  • https://www.cnblogs.com/chengxiao/p/6059914.html

TreeMap

TreeMap繼承於AbstractMap,實現了Map, Cloneable, NavigableMap, Serializable介面。

TreeMap 是一個有序的key-value集合,它是通過紅黑樹實現的。該對映根據其鍵的自然順序進行排序,或者根據建立對映時提供的Comparator進行排序,具體取決於使用的構造方法。
TreeMap的基本操作 containsKey、get、put 和 remove 的時間複雜度是 log(n) 。

對於SortedMap來說,該類是TreeMap體系中的父介面,也是區別於HashMap體系最關鍵的一個介面。SortedMap介面中定義的第一個方法Comparator<? super K> comparator();該方法決定了TreeMap體系的走向,有了比較器,就可以對插入的元素進行排序了。

TreeMap的查詢、插入、更新元素等操作,主要是對紅黑樹的節點進行相應的更新,和資料結構中類似。

TreeSet

TreeSet基於TreeMap實現,底層也是紅黑樹。只是每次插入元素時,value為一個預設的dummy資料。

HashSet

HashSet的實現很簡單,內部有一個HashMap的成員變數,所有的Set相關的操作都轉換為了對HashMapde操作。

public class HashSet<E>
    extends AbstractSet<E>
    implements Set<E>, Cloneable, java.io.Serializable
{
    static final long serialVersionUID = -5024744406713321676L;

    private transient HashMap<E,Object> map;

    // Dummy value to associate with an Object in the backing Map
    private static final Object PRESENT = new Object();
    
    //其他操作省略
 }

從上面的code可以看到,內部還定義了一個PRESENT的dummy物件,當新增元素時,直接新增一對鍵值對,key為元素值,value為PRESENT。

    /**
     * Adds the specified element to this set if it is not already present.
     * More formally, adds the specified element <tt>e</tt> to this set if
     * this set contains no element <tt>e2</tt> such that
     * <tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
     * If this set already contains the element, the call leaves the set
     * unchanged and returns <tt>false</tt>.
     *
     * @param e element to be added to this set
     * @return <tt>true</tt> if this set did not already contain the specified
     * element
     */
    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }

其他的操作類似,就是把PRESENT當做value。

LinkedHashMap

首先是定義,

public class LinkedHashMap<K,V>
    extends HashMap<K,V>
    implements Map<K,V>
{
    ...
}

可以看到,LinkedHashMap是HashMap的子類,但和HashMap的無序性不一樣,LinkedHashMap通過維護一個運行於所有條目的雙向連結串列,保證了元素迭代的順序。該迭代順序可以是插入順序或者是訪問順序,這個可以在初始化的時候確定,預設採用插入順序來維持取出鍵值對的次序。

在成員變數上,與HashMap不同的是,引入了before和after兩個變數來記錄前後的元素。

1、K key

2、V value

3、Entry<K, V> next

4、int hash

5、Entry<K, V> before

6、Entry<K, V> after

1-4是從HashMap.Entry中繼承過來的;5-6是LinkedHashMap獨有的。注意next是用於維護HashMap指定table位置上連線的Entry的順序的,before、After是用於維護Entry插入的先後順序的。

可以把LinkedHashMap的結構看成如下圖所示:

接下來主要介紹LinkedHashMap的排序操作,

在建構函式中,需要指定accessOrder,有兩種情況:

  • false,所有的Entry按照插入的順序排列
  • true,所有的Entry按照訪問的順序排列
public LinkedHashMap(int initialCapacity,
         float loadFactor, boolean accessOrder) {
    super(initialCapacity, loadFactor);
    this.accessOrder = accessOrder;
}

第二種情況,也就是accessOrder為true時,每次通過get/put方法訪問時,都把訪問的那個資料移到雙向佇列的尾部去,也就是說,雙向佇列最頭的那個資料就是最不常訪問的那個資料。具體實現如下,afterNodeAccess這個方法在HashMap中沒有實現,LinkedHashMap進行了實現,將元素進行排序。

    void afterNodeAccess(Node<K,V> e) { // move node to last
        LinkedHashMap.Entry<K,V> last;
        if (accessOrder && (last = tail) != e) {
            LinkedHashMap.Entry<K,V> p =
                (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
            p.after = null;
            if (b == null)
                head = a;
            else
                b.after = a;
            if (a != null)
                a.before = b;
            else
                last = b;
            if (last == null)
                head = p;
            else {
                p.before = last;
                last.after = p;
            }
            tail = p;
            ++modCount;
        }
    }

利用LinkedHashMap實現LRU快取

LRU即Least Recently Used,最近最少使用,也就是說,當快取滿了,會優先淘汰那些最近最不常訪問的資料。LinkedHashMap正好滿足這個特性,當我們開啟accessOrder為true時,最新訪問(get或者put(更新操作))的資料會被丟到佇列的尾巴處,那麼雙向佇列的頭就是最不經常使用的資料了。

此外,LinkedHashMap還提供了一個方法,這個方法就是為了我們實現LRU快取而提供的,removeEldestEntry(Map.Entry<K,V> eldest) 方法。該方法可以提供在每次新增新條目時移除最舊條目的實現程式,預設返回 false。

下面是一個最簡單的LRU快取的實現,當size超過maxElement時,每次新增一個元素時,就會移除最久遠的元素。

public class LRUCache extends LinkedHashMap
{
    public LRUCache(int maxSize)
    {
        super(maxSize, 0.75F, true);
        maxElements = maxSize;
    }

    protected boolean removeEldestEntry(java.util.Map.Entry eldest)
    {
        //邏輯很簡單,當大小超出了Map的容量,就移除掉雙向佇列頭部的元素,給其他元素騰出點地來。
        return size() > maxElements;
    }

    private static final long serialVersionUID = 1L;
    protected int maxElements;
}

參考:

  • https://juejin.im/post/5a4b433b6fb9a0451705916f
  • https://www.cnblogs.com/xiaoxi/p/6170590.html

5.2 ConcurrentHashMap

這節開始介紹併發容器,首先是ConcurrentHashMap,實現了執行緒安全的HashMap。之前也提到了HashMap在多執行緒環境下的問題,這小節先詳細分析為什麼HashMap多執行緒下不安全。

HashMap多執行緒環境下的問題分析

首先說結論,為什麼HashMap不是執行緒安全的?在多執行緒下,會導致HashMap的Entry連結串列形成環形資料結構,一旦形成環形,Entry的next節點永遠不為空,無論是進行resize還是get/size等操作時,就會產生死迴圈。

首先針對JDK7進行分析:

下面是resize部分的程式碼,這段程式碼將原HashMap中的元素依次移動到擴容後的HashMap中,

1:  // Transfer method in java.util.HashMap -
2:  // called to resize the hashmap
3:  // 依次移動每個bucket中的元素到新的buckets中
4:  for (int j = 0; j < src.length; j++) {
5:    Entry e = src[j];
6:    if (e != null) {
7:      src[j] = null;
8:      do {
            // Next指向下一個需要移動的元素
9:          Entry next = e.next; 
            // 計算新Map中的位置
10:         int i = indexFor(e.hash, newCapacity);
            // 插入到bucket中第一個位置
11:         e.next = newTable[i];
12:         newTable[i] = e;
            // 指向原bucket中下一個位置的元素
13:         e = next;
14:     } while (e != null);
15:   }
16: } 

在正常單執行緒的情況下,如果有如下的HashMap的結構,為了方便這裡只有2個bucket(java.util.HashMap中預設是 16)。

按照上面的resize流程,e和next分別指向A和B,A是第一次迭代將會被移動的元素,B是下一個。

  • 第一次迭代後,A被移動到新的Map中,Map的容量已經增大了一倍。A的位置如下圖所示

  • 第二次迭代後,B被移動到了新的位置,如下圖所示,C為下一個待移動的元素。

  • 第三次迭代之後,C被移動到了新的位置,由於C之後沒有其他元素,因此整個resize過程完成,最後新的Map如下:

在resize完成之後,每個bucket的深度變小了,達到了resize的目的。整個過程在單執行緒下沒有任何問題,但是考慮到多執行緒的情況,就會可能會出現競爭。

現在有兩個執行緒Thread1,Thread2同時進行resize的操作,假設Thread1在執行到第9行後,Thread2獲取了CPU並且也開始執行resize的操作。

1:  // Transfer method in java.util.HashMap -
2:  // called to resize the hashmap
3:  
4:  for (int j = 0; j < src.length; j++) {
5:    Entry e = src[j];
6:    if (e != null) {
7:      src[j] = null;
8:      do {
9:      Entry next = e.next; 
     // Thread1 STOPS RIGHT HERE
10:     int i = indexFor(e.hash, newCapacity);
11:     e.next = newTable[i];
12:     newTable[i] = e;
13:     e = next;
14:   } while (e != null);
15:   }
16: } 

Thread1執行後,對應的e1和next1別指向A和B,但是Thread1並沒有移動元素。

假設Thread2在獲取CPU後完整的運行了整個resize,新的Map結構將會如下圖所示:

注意到e1next1還是指向A和B,但是A和B的位置關係已經變了,按照resize的演算法進行兩輪迭代之後,變成如下的結構,

注意此時enext的指向,在下一次的迭代中,將把A放在第3個bucket的一個位置,但是B仍然是指向A的,所以出現了下面的類似於雙向連結串列的結構,

接著Thread1就會進入到無限迴圈中,此時如果有get操作的話,也會出現無限迴圈的情況。這就是HashMap在多執行緒情況下容易出現的問題。

接著針對JDK8進行分析:

前面已經提到,JDK8和7在Resize的不同之處就是8保留了連結串列中元素的先後位置,這樣基本可以確保在resize過程中不出現迴圈的問題,但是還是可能出現數據丟失的問題。以下是resize的核心實現,

                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }

在實現中會使用兩個臨時連結串列,分別儲存新地址和舊地址的連結串列,最後將這兩個連結串列放到對應的位置。

假定出現如下的情況,有ABC三個元素需要移動,首先執行緒1指向A,next即為B,此後執行緒2同樣進行resize,並把high/low兩個連結串列的更新完成,這時返回執行緒1繼續執行。

但是執行緒1仍然按照正常的流程繼續,A會被放到High連結串列,B會被放到Low連結串列,這之後由於B後面沒有元素,更新完成,因此C就漏掉了。

其實不管是JDK7還是8,由於連結串列的很多操作都沒有加鎖,每個操作也不是原子操作,導致可能出現很多意想不到的結果,也是為什麼需要引入專門的ConcurrentHashMap。

ConcurrentHashMap介紹

為什麼不使用HashTable?

之前介紹的HashTable也能保證執行緒安全,但是HashTable使用synchronized來保證執行緒安全,但線上程競爭激烈的情況下HashTable的效率非常低下。因為當一個執行緒訪問HashTable的同步方法,其他執行緒也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如執行緒1使用put進行元素新增,執行緒2不但不能使用put方法新增元素,也不能使用get方法來獲取元素,所以競爭越激烈效率越低。正因為如此,需要引入更加高效的多執行緒解決方案。

ConcurrentHashMap的結構在JDk1.7和1.8中有較大的不同,下面將會分別進行介紹。

JDK1.7中的實現

ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成。Segment實際繼承自可重入鎖(ReentrantLock),在ConcurrentHashMap裡扮演鎖的角色;HashEntry則用於儲存鍵值對資料。一個ConcurrentHashMap裡包含一個Segment陣列,每個Segment裡包含一個HashEntry陣列,我們稱之為table,每個HashEntry是一個連結串列結構的元素。

Segment實際繼承自可重入鎖(ReentrantLock),這是與普通HashMap的最大區別。

面試點:ConcurrentHashMap實現原理是怎麼樣的或者ConcurrentHashMap如何在保證高併發下執行緒安全的同時實現了效能提升?

ConcurrentHashMap允許多個修改操作併發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不同部分進行的修改。內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的hash table,只要多個修改操作發生在不同的段上,它們就可以併發進行。

1.1 初始化過程

初始化有三個引數:

  • initialCapacity:初始容量大小 ,預設16。
  • loadFactor, 擴容因子或者叫負載因子,預設0.75,當一個Segment儲存的元素數量大於initialCapacity* loadFactor時,該Segment會進行一次擴容。
  • concurrencyLevel 併發度:預設16。併發度可以理解為程式執行時能夠同時操作ConccurentHashMap且不產生鎖競爭的最大執行緒數,實際上就是ConcurrentHashMap中的分段鎖個數,即Segment[]的陣列長度。如果併發度設定的過小,會帶來嚴重的鎖競爭問題;如果併發度設定的過大,原本位於同一個Segment內的訪問會擴散到不同的Segment中,CPU cache命中率會下降,從而引起程式效能下降。

以下是對初始化函式的分析:

1.2 Hash值計算

對某個元素進行Put/Get操作之前,都需要定位該元素在哪個segment元素的某個table元素中的,定位的過程,取得key的hashcode值進行一次再雜湊(通過Wang/Jenkins演算法),拿到再雜湊值後,以再雜湊值的高位進行取模得到當前元素在哪個segment上。

具體的Hash實現如下:

1.3 Get方法

定位segment和定位table後,依次掃描這個table元素下的的連結串列,要麼找到元素,要麼返回null。

在高併發下的情況下如何保證取得的元素是最新的?

用於儲存鍵值對資料的HashEntry,在設計上它的成員變數value等都是volatile型別的,這樣就保證別的執行緒對value值的修改,get方法可以馬上看到。

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
    }

1.4 Put方法

1、首先定位segment,當這個segment在map初始化後,還為null,由ensureSegment方法負責填充這個segment。

2、對Segment加鎖,雖然value是volatile的,只能保證可見性,不能保證原子性。這裡put操作不是原子操作,因此需要加鎖。

3、定位所在的table元素,並掃描table下的連結串列,找到時:

注意到預設onlyIfAbsent為false,也就是如果有相同key的元素,會覆蓋舊的值。無論是否覆蓋,都是返回舊值。

沒有找到時:

1.5 擴容操作

擴容操作不會擴容Segment,只會擴容對應的table陣列,每次都是將陣列翻倍。

之前也提到過,由於陣列長度為2次冪,所以每次擴容之後,元素要麼在原處,要麼在原處加上偏移量為舊的size的新位置。

1.6 Size方法

size的時候進行兩次不加鎖的統計,兩次一致直接返回結果,不一致,重新加鎖再次統計,

ConcurrentHashMap的弱一致性

get方法和containsKey方法都是通過對連結串列遍歷判斷是否存在key相同的節點以及獲得該節點的value。但由於遍歷過程中其他執行緒可能對連結串列結構做了調整,因此get和containsKey返回的可能是過時的資料,這一點是ConcurrentHashMap在弱一致性上的體現。

JDK1.8中的實現

相比JDK1.7的重要變化:

1、取消了segment陣列,引入了Node結構,直接用Node陣列來儲存資料,鎖的粒度更小,減少併發衝突的概率。
2、儲存資料時採用了連結串列+紅黑樹的形式,純連結串列的形式時間複雜度為O(n),紅黑樹則為O(logn),效能提升很大。什麼時候連結串列轉紅黑樹?當key值相等的元素形成的連結串列中元素個數超過8個的時候。

2.1 資料結構

  • Node:存放實際的key和value值。
  • sizeCtl:負數:表示進行初始化或者擴容,-1表示正在初始化,-N,表示有N-1個執行緒正在進行擴容
    正數:0 表示還沒有被初始化,>0的數,初始化或者是下一次進行擴容的閾值。
  • TreeNode:用在紅黑樹,表示樹的節點, TreeBin是實際放在table陣列中的,代表了這個紅黑樹的根。

ConcurrentHashMap在初始化時,只是給成員變數賦值,put時進行實際陣列的填充。

2.2 Hash計算

先計算key的hash值,然後將高位加入計算來進行再雜湊。

2.3 Get方法

首先計算hash值,確定在table中的位置。

  • 是否剛好在table中某個首元素,找到返回;
  • 在樹中查詢
  • 在連結串列中查詢

注意到在初始化TreeBin,也就是設定紅黑樹所在的Node的第一個節點時,會設定對應的hash值,這些hash值定義如下。所以上面的程式碼中,可以通過判斷首節點的hash值<0來確定該節點為樹。

    static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations

2.4 Put方法

PUT方法中會實際初始化陣列,


2.5 擴容操作

執行緒執行put操作,發現容量已經達到擴容閾值,需要進行擴容操作。ConcurrentHashMap支援併發擴容,實現方式是,將表拆分,讓每個執行緒處理自己的區間。如下圖:

遷移完畢的hash桶,會被設定成ForwardingNode節點,以此告知訪問此桶的其他執行緒,此節點已經遷移完畢。此時執行緒2訪問到了ForwardingNode節點,如果執行緒2執行的put或remove等寫操作,那麼就會先幫其擴容。如果執行緒2執行的是get等讀方法,則會呼叫ForwardingNode的find方法,去nextTable裡面查詢相關元素。

2.6 Size

Put操作時,addCount 方法用於 CAS 更新 baseCount,但很有可能在高併發的情況下,更新失敗,那麼這些節點雖然已經被新增到雜湊表中了,但是數量卻沒有被統計。

當更新 baseCount 失敗的時候,會呼叫 fullAddCount 將這些失敗的結點包裝成一個 CounterCell 物件,儲存在 CounterCell 陣列中。

整張表實際的 size 其實是 baseCount 加上 CounterCell 陣列中元素的個數。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}

具體的計算count方法,

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

和JDK1.7一樣,這樣得到的size也只是大概數字,也具有弱一致性。

5.3 ConcurrentSkipListMap

ConcurrentSkipListMap是一個併發安全, 基於skiplist實現有序儲存的Map。可以看成是TreeMap的併發版本。

ConcurrentHashMap採用空間換取時間, 但它有著ConcurrentHashMap不能比擬的優點: 有序資料儲存.

SkipList的結構如下圖所示:

從圖中可以得出ConcurrentSkipListMap的幾個特點:

  1. ConcurrentSkipListMap 的節點主要由 Node, Index, HeadIndex 構成;
  2. ConcurrentSkipListMap 的資料結構橫向縱向都是連結串列
  3. 最下面那層連結串列是Node層(資料節點層), 上面幾層都是Index層(索引)
  4. 從縱向連結串列來看, 最左邊的是 HeadIndex 層, 右邊的都是Index 層, 且每層的最底端都是對應Node, 縱向上的索引都是指向最底端的Node。

5.4 ConcurrentSkipListSet

ConcurrentSkipListSet基於ConcurrentSkipListMap實現,類似於TreeSet基於TreeMap實現。

5.5 ConcurrentLinkedQueue

ConcurrentLinkedQueue實現了一個高併發的佇列,底層使用連結串列作為其資料結構。從效能角度看,可以算是高併發環境下效能最好的隊列了。

ConcurrentLinkedQueue類中,核心節點Node的定義如下,item表示目標元素,next表示當前Node的下一個元素。

    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

add,offer將元素插入到尾部,其中add實現上直接呼叫了offer。peek方法拿頭部的資料,但是不移除和poll拿頭部的資料,但是同時移除。

5.6 CopyOnWriteArrayList

CopyOnWrite(寫時複製)的容器。通俗的理解是當我們往一個容器新增元素的時候,不直接往當前容器新增,而是先將當前容器進行Copy,複製出一個新的容器,然後新的容器裡新增元素,新增完元素之後,再用新的容器替換舊的容器。

好處是我們可以對容器進行併發的讀,而不需要加鎖,因為當前容器不會新增任何元素。所以寫時複製容器也是一種讀寫分離的思想,讀和寫不同的容器。如果讀的時候有多個執行緒正在向容器新增資料,讀還是會讀到舊的資料,因為寫的時候不會鎖住舊的,只能保證最終一致性。

下面介紹一下寫的過程,

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

首先,寫入操作使用鎖,主要是為了控制寫寫的情況。接著進行新陣列的複製,將新的元素加入newElements,最後使用新的陣列替換老的陣列,修改就完成了。整個過程不會影響讀取,並且修改完成以後,讀取執行緒可以“覺察”到這個修改,因為array是volatile型別,保證了可見性。

    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

容器的適用場景:適用讀多寫少的併發場景,常見應用:白名單/黑名單,商品類目的訪問和更新場景。但是由於會複製舊的陣列,所有可能存在記憶體佔用問題。

5.7 CopyOnWriteArraySet

CopyOnWriteArraySet基於CopyOnWriteArrayList實現,為了保證資料的唯一性,在往其中加入資料時,會check當前陣列中是否存在該元素,如果不存在,則加入到當前陣列。

    /**
     * Appends the element, if not present.
     *
     * @param e element to be added to this list, if absent
     * @return {@code true} if the element was added
     */
    public boolean addIfAbsent(E e) {
        Object[] snapshot = getArray();
        return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
            addIfAbsent(e, snapshot);
    }

5.8 阻塞佇列

定義與常用操作

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作是:

  • 在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
  • 當佇列滿時,儲存元素的執行緒會等待佇列可用。

阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

阻塞佇列提供了四種處理方法:

方法\處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲 IllegalStateException("Queue full") 異常。當佇列為空時,從佇列裡獲取元素時會丟擲 NoSuchElementException 異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回 true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回 null
  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡 put 元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡 take 元素,佇列也會阻塞消費者執行緒,直到佇列可用。
  • 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。

Java裡的阻塞佇列

JDK7 提供了 7 個阻塞佇列。分別是

  • ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

1. ArrayBlockingQueue

ArrayBlockingQueue 是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證訪問者公平的訪問佇列,所謂公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下程式碼建立一個公平的阻塞佇列:

2. LinkedBlockingQueue

一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為 Integer.MAX_VALUE。此佇列按照先進先出的原則對元素進行排序。

3. PriorityBlockingQueue

一個支援優先順序的無界佇列。預設情況下元素採取自然順序排列,也可以通過比較器 comparator 來指定元素的排序規則。元素按照升序排列。

4. DelayQueue

一個支援延時獲取元素的無界阻塞佇列。佇列使用 PriorityQueue 來實現。佇列中的元素必須實現 Delayed 介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。我們可以將 DelayQueue 運用在以下應用場景:

  • 快取系統的設計:可以用 DelayQueue 儲存快取元素的有效期,使用一個執行緒迴圈查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示快取有效期到了。

  • 定時任務排程。使用 DelayQueue 儲存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行,從比如 TimerQueue 就是使用 DelayQueue 實現的。

佇列中的 Delayed 必須實現 compareTo 來指定元素的順序。比如讓延時時間最長的放在佇列的末尾。

5. SynchronousQueue

SynchronousQueue 是一個不儲存元素的阻塞佇列。每一個 put 操作必須等待一個 take 操作,否則不能繼續新增元素。SynchronousQueue 可以看成是一個傳球手,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒。佇列本身並不儲存任何元素,非常適合於傳遞性場景, 比如在一個執行緒中使用的資料,傳遞給另外一個執行緒使用,SynchronousQueue 的吞吐量高於 LinkedBlockingQueue 和 ArrayBlockingQueue。

6. LinkedTransferQueue

是一個由連結串列結構組成的無界阻塞 TransferQueue 佇列。相對於其他阻塞佇列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

  • transfer 方法。如果當前有消費者正在等待接收元素(消費者使用 take() 方法或帶時間限制的 poll() 方法時),transfer 方法可以把生產者傳入的元素立刻 transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer 方法會將元素存放在佇列的 tail 節點,並等到該元素被消費者消費了才返回。transfer 方法的關鍵程式碼如下:
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行程式碼是試圖把存放當前元素的 s 節點作為 tail 節點。第二行程式碼是讓 CPU 自旋等待消費者消費元素。因為自旋會消耗 CPU,所以自旋一定的次數後使用 Thread.yield() 方法來暫停當前正在執行的執行緒,並執行其他執行緒。

  • tryTransfer 方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回 false。和 transfer 方法的區別是 tryTransfer 方法無論消費者是否接收,方法立即返回。而 transfer 方法是必須等到消費者消費了才返回。

對於帶有時間限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回 false,如果在超時時間內消費了元素,則返回 true。

7. LinkedBlockingDeque

一個由連結串列結構組成的雙向阻塞佇列。所謂雙向佇列指的你可以從佇列的兩端插入和移出元素。雙端佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。相比其他的阻塞佇列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結