1. 程式人生 > >常用Java併發容器

常用Java併發容器

併發容器介紹

  1. ConcurrentHashMap:併發版HashMap
  2. CopyOnWriteArrayList:併發版ArrayList
  3. CopyOnWriteArraySet:併發Set
  4. ConcurrentLinkedQueue:併發佇列(基於連結串列)
  5. ConcurrentLinkedDeque:併發佇列(基於雙向連結串列)
  6. ConcurrentSkipListMap:基於跳錶的併發Map
  7. ConcurrentSkipListSet:基於跳錶的併發Set
  8. ArrayBlockingQueue:阻塞佇列(基於陣列)
  9. LinkedBlockingQueue:阻塞佇列(基於連結串列)
  10. LinkedBlockingDeque:阻塞佇列(基於雙向連結串列)
  11. PriorityBlockingQueue:執行緒安全的優先佇列
  12. SynchronousQueue:讀寫成對的佇列
  13. LinkedTransferQueue:基於連結串列的資料交換佇列
  14. DelayQueue:延時佇列

1.ConcurrentHashMap 併發版HashMap

最常見的併發容器之一,可以用作併發場景下的快取。底層依然是雜湊表,但在JAVA 8中有了不小的改變,而JAVA 7和JAVA 8都是用的比較多的版本,因此經常會將這兩個版本的實現方式做一些比較(比如面試中)。 一個比較大的差異就是,JAVA 7中採用分段鎖來減少鎖的競爭,JAVA 8中放棄了分段鎖,採用CAS(一種樂觀鎖),同時為了防止雜湊衝突嚴重時退化成連結串列(衝突時會在該位置生成一個連結串列,雜湊值相同的物件就鏈在一起),會在連結串列長度達到閾值(8)後轉換成紅黑樹(比起連結串列,樹的查詢效率更穩定)。

2.CopyOnWriteArrayList 併發版ArrayList

併發版ArrayList,底層結構也是陣列,和ArrayList不同之處在於:當新增和刪除元素時會建立一個新的陣列,在新的陣列中增加或者排除指定物件,最後用新增陣列替換原來的陣列。 適用場景:由於讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用於讀多寫少的場景。 侷限:由於讀的時候不會加鎖(讀的效率高,就和普通ArrayList一樣),讀取的當前副本,因此可能讀取到髒資料。如果介意,建議不用。 看看原始碼感受下:
public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    final transient ReentrantLock lock = new ReentrantLock();
    private transient volatile Object[] array;
    // 新增元素,有鎖
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock(); // 修改時加鎖,保證併發安全
        try {
            Object[] elements = getArray(); // 當前陣列
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1); // 建立一個新陣列,比老的大一個空間
            newElements[len] = e; // 要新增的元素放進新陣列
            setArray(newElements); // 用新陣列替換原來的陣列
            return true;
        } finally {
            lock.unlock(); // 解鎖
        }
    }
    // 讀元素,不加鎖,因此可能讀取到舊資料
    public E get(int index) {
        return get(getArray(), index);
    }
}

 

3.CopyOnWriteArraySet 併發Set

基於CopyOnWriteArrayList實現(內含一個CopyOnWriteArrayList成員變數),也就是說底層是一個數組,意味著每次add都要遍歷整個集合才能知道是否存在,不存在時需要插入(加鎖)。 適用場景:在CopyOnWriteArrayList適用場景下加一個,集合別太大(全部遍歷傷不起)。  

4.ConcurrentLinkedQueue 併發佇列(基於連結串列)

基於連結串列實現的併發佇列,使用樂觀鎖(CAS)保證執行緒安全。因為資料結構是連結串列,所以理論上是沒有佇列大小限制的,也就是說新增資料一定能成功。  

5.ConcurrentLinkedDeque 併發佇列(基於雙向連結串列)

基於雙向連結串列實現的併發佇列,可以分別對頭尾進行操作,因此除了先進先出(FIFO),也可以先進後出(FILO),當然先進後出的話應該叫它棧了。  

6.ConcurrentSkipListMap 基於跳錶的併發Map

SkipList即跳錶,跳錶是一種空間換時間的資料結構,通過冗餘資料,將連結串列一層一層索引,達到類似二分查詢的效果  

7.ConcurrentSkipListSet 基於跳錶的併發Set

類似HashSet和HashMap的關係,ConcurrentSkipListSet裡面就是一個ConcurrentSkipListMap,就不細說了。  

8.ArrayBlockingQueue 阻塞佇列(基於陣列)

基於陣列實現的可阻塞佇列,構造時必須制定陣列大小,往裡面放東西時如果陣列滿了便會阻塞直到有位置(也支援直接返回和超時等待),通過一個鎖ReentrantLock保證執行緒安全。 用offer操作舉個例子:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /**
     * 讀寫共用此鎖,執行緒間通過下面兩個Condition通訊
     * 這兩個Condition和lock有緊密聯絡(就是lock的方法生成的)
     * 類似Object的wait/notify
     */
    final ReentrantLock lock;
    /** 佇列不為空的訊號,取資料的執行緒需要關注 */
    private final Condition notEmpty;
    /** 佇列沒滿的訊號,寫資料的執行緒需要關注 */
    private final Condition notFull;
    // 一直阻塞直到有東西可以拿出來
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    // 在尾部插入一個元素,佇列已滿時等待指定時間,如果還是不能插入則返回
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // 鎖住
        try {
            // 迴圈等待直到佇列有空閒
            while (count == items.length) {
                if (nanos <= 0)
                    return false;// 等待超時,返回
                // 暫時放出鎖,等待一段時間(可能被提前喚醒並搶到鎖,所以需要迴圈判斷條件)
                // 這段時間可能其他執行緒取走了元素,這樣就有機會插入了
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);//插入一個元素
            return true;
        } finally {
            lock.unlock(); //解鎖
        }
    }
乍一看會有點疑惑,讀和寫都是同一個鎖,那要是空的時候正好一個讀執行緒來了不會一直阻塞嗎? 答案就在notEmpty、notFull裡,這兩個出自lock的小東西讓鎖有了類似synchronized + wait + notify的功能。

9.LinkedBlockingQueue 阻塞佇列(基於連結串列)

基於連結串列實現的阻塞佇列,想比與不阻塞的ConcurrentLinkedQueue,它多了一個容量限制,如果不設定預設為int最大值。  

10.LinkedBlockingDeque 阻塞佇列(基於雙向連結串列)

類似LinkedBlockingQueue,但提供了雙向連結串列特有的操作。  

11.PriorityBlockingQueue 執行緒安全的優先佇列

構造時可以傳入一個比較器,可以看做放進去的元素會被排序,然後讀取的時候按順序消費。某些低優先順序的元素可能長期無法被消費,因為不斷有更高優先順序的元素進來。  

12.SynchronousQueue 資料同步交換的佇列

一個虛假的佇列,因為它實際上沒有真正用於儲存元素的空間,每個插入操作都必須有對應的取出操作,沒取出時無法繼續放入。 一個簡單的例子感受一下:
import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                // 沒有休息,瘋狂寫入
                for (int i = 0; ; i++) {
                    System.out.println("放入: " + i);
                    queue.put(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                // 鹹魚模式取資料
                while (true) {
                    System.out.println("取出: " + queue.take());
                    Thread.sleep((long) (Math.random() * 2000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
/* 輸出:
放入: 0
取出: 0
放入: 1
取出: 1
放入: 2
取出: 2
放入: 3
取出: 3
*/

 

可以看到,寫入的執行緒沒有任何sleep,可以說是全力往佇列放東西,而讀取的執行緒又很不積極,讀一個又sleep一會。輸出的結果卻是讀寫操作成對出現。 JAVA中一個使用場景就是Executors.newCachedThreadPool(),建立一個快取執行緒池。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
        0, // 核心執行緒為0,沒用的執行緒都被無情拋棄
        Integer.MAX_VALUE, // 最大執行緒數理論上是無限了,還沒到這個值機器資源就被掏空了
        60L, TimeUnit.SECONDS, // 閒置執行緒60秒後銷燬
        new SynchronousQueue<Runnable>()); // offer時如果沒有空閒執行緒取出任務,則會失敗,執行緒池就會新建一個執行緒
}

 

13.LinkedTransferQueue 基於連結串列的資料交換佇列

實現了介面TransferQueue,通過transfer方法放入元素時,如果發現有執行緒在阻塞在取元素,會直接把這個元素給等待執行緒。如果沒有人等著消費,那麼會把這個元素放到佇列尾部,並且此方法阻塞直到有人讀取這個元素。和SynchronousQueue有點像,但比它更強大。  

14.DelayQueue 延時佇列

可以使放入佇列的元素在指定的延時後才被消費者取出,元素需要實現Delayed介面。