1. 程式人生 > >20、併發包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什麼區別?(高併發程式設計----6)

20、併發包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什麼區別?(高併發程式設計----6)

目錄

今天我要問你的問題是,併發包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什麼區別?

典型回答

考點分析

知識擴充套件

執行緒安全佇列一覽

佇列使用場景與典型用例

前面介紹了各種佇列實現,在日常的應用開發中,如何進行選擇呢?

一課一練


今天我來介紹一下執行緒安全佇列。Java 標準庫提供了非常多的執行緒安全佇列,很容易混淆。

今天我要問你的問題是,併發包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什麼區別?

典型回答

有時候我們把併發包下面的所有容器都習慣叫作併發容器,但是嚴格來講,類似 ConcurrentLinkedQueue 這種“Concurrent”容器,才是真正代表併發

關於問題中它們的區別:

  •   Concurrent 型別基於 lock-free,在常見的多執行緒訪問場景,一般可以提供較高吞吐量。
  •   而 LinkedBlockingQueue 內部則是基於鎖,並提供了 BlockingQueue 的等待性方法。

不知道你有沒有注意到,java.util.concurrent 包提供的容器(Queue、List、Set)、Map,從命名上可以大概區分為 
Concurrent、CopyOnWrite和 Blocking等三類,同樣是執行緒安全容器,可以簡單認為:

  •   Concurrent 型別沒有類似 CopyOnWrite 之類容器相對較重的修改開銷。
  •   但是,凡事都是有代價的,Concurrent 往往提供了較低的遍歷一致性。你可以這樣理解所謂的弱一致性,例如,當利用迭代器遍歷時,如果容器發生修改,迭代器仍然可以繼續進行遍歷。
  •   與弱一致性對應的,就是我介紹過的同步容器常見的行為“fail-fast”,也就是檢測到容器在遍歷過程中發生了修改,則丟擲 ConcurrentModificationException,不再繼續遍歷。
  •   弱一致性的另外一個體現是,size 等操作準確性是有限的,未必是 100% 準確。
  •   與此同時,讀取的效能具有一定的不確定性。

 

考點分析

今天的問題是又是一個引子,考察你是否瞭解併發包內部不同容器實現的設計目的和實現區別。

佇列是非常重要的資料結構,我們日常開發中很多執行緒間資料傳遞都要依賴於它,Executor 框架提供的各種執行緒池,同樣無法離開佇列。面試官可以從不同角度考察,比如:

  •   哪些佇列是有界的,哪些是無界的?(很多同學反饋了這個問題)
  •   針對特定場景需求,如何選擇合適的佇列實現?
  •   從原始碼的角度,常見的執行緒安全佇列是如何實現的,並進行了哪些改進以提高效能表現?

為了能更好地理解這一講,需要你掌握一些基本的佇列本身和資料結構方面知識,如果這方面知識比較薄弱,《資料結構與演算法分析》是一本比較全面的參考書,專欄還是儘量專注於 Java 領域的特性。

 

知識擴充套件

執行緒安全佇列一覽

我在專欄第 8 講中介紹過,常見的集合中如 LinkedList 是個 Deque,只不過不是執行緒安全的。下面這張圖是 Java 
併發類庫提供的各種各樣的執行緒安全佇列實現,注意,圖中並未將非執行緒安全部分包含進來。

我們可以從不同的角度進行分類,從基本的資料結構的角度分析,有兩個特別的Deque實現,ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的側重點是支援對佇列頭尾都進行插入和刪除,所以提供了特定的方法,如:

  •   尾部插入時需要的addLast(e)、offerLast(e)。
  •   尾部刪除所需要的removeLast()、pollLast()。

從上面這些角度,能夠理解 ConcurrentLinkedDeque 和 LinkedBlockingQueue 的主要功能區別,也就足夠日常開發的需要了。但是如果我們深入一些,通常會更加關注下面這些方面。從行為特徵來看,絕大部分 Queue 都是實現了 BlockingQueue 介面。在常規佇列操作基礎上,Blocking 意味著其提供了特定的等待性操作,獲取時(take)等待元素進隊,或者插入時(put)等待隊列出現空位。

 /**
 * 獲取並移除佇列頭結點,如果必要,其會等待直到隊列出現元素
…
 */
E take() throws InterruptedException;

/**
 * 插入元素,如果佇列已滿,則等待直到隊列出現空閒空間
   …
 */
void put(E e) throws InterruptedException;  

另一個 BlockingQueue 經常被考察的點,就是是否有界(Bounded、Unbounded),這一點也往往會影響我們在應用開發中的選擇,我這裡簡單總結一下。

  •   ArrayBlockingQueue 是最典型的的有界佇列,其內部以 final 的陣列儲存資料,陣列的大小就決定了佇列的邊界,所以我們在建立 ArrayBlockingQueue 時,都要指定容量,如
public ArrayBlockingQueue(int capacity, boolean fair)

  LinkedBlockingQueue,容易被誤解為無邊界,但其實其行為和內部程式碼都是基於有界的邏輯實現的,只不過如果我們沒有在建立佇列時就指定容量,那麼其容量限制就自動被設定為Integer.MAX_VALUE,成為了無界佇列。

  •   SynchronousQueue,這是一個非常奇葩的佇列實現,每個刪除操作都要等待插入操作,反之每個插入操作也都要等待刪除動作。那麼這個佇列的容量是多少呢?是1 嗎?其實不是的,其內部容量是 0。
  •   PriorityBlockingQueue 是無邊界的優先佇列,雖然嚴格意義上來講,其大小總歸是要受系統資源影響。
  •   DelayedQueue 和 LinkedTransferQueue 同樣是無邊界的佇列。對於無邊界的佇列,有一個自然的結果,就是 put 操作永遠也不會發生其他 BlockingQueue 的那種等待情況。


如果我們分析不同佇列的底層實現,BlockingQueue 基本都是基於鎖實現,一起來看看典型的 LinkedBlockingQueue。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

 

我在介紹 ReentrantLock 的條件變數用法的時候分析過 ArrayBlockingQueue,不知道你有沒有注意到,其條件變數與 
LinkedBlockingQueue 版本的實現是有區別的。notEmpty、notFull 都是同一個再入鎖的條件變數,而 LinkedBlockingQueue 則改進了鎖操作的粒度,頭、尾操作使用不同的鎖,所以在通用場景下,它的吞吐量相對要更好一些。

下面的 take 方法與 ArrayBlockingQueue 中的實現,也是有不同的,由於其內部結構是連結串列,需要自己維護元素數量值,請參考下面的程式碼。

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

類似 ConcurrentLinkedQueue 等,則是基於 CAS 的無鎖技術,不需要在每個操作時使用鎖,所以擴充套件性表現要更加優異。

相對比較另類的 SynchronousQueue,在 Java 6 中,其實現發生了非常大的變化,利用 CAS 替換掉了原本基於鎖的邏輯,同步開銷比較小。它是 Executors.newCachedThreadPool() 的預設佇列。

 

佇列使用場景與典型用例

在實際開發中,我提到過 Queue 被廣泛使用在生產者 - 消費者場景,比如利用 BlockingQueue 來實現,由於其提供的等待機制,我們可以少操心很多協調工作,你可以參考下面樣例程式碼:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ConsumerProducer {
    public static final String EXIT_MSG  = "Good bye!";
    public static void main(String[] args) {
// 使用較小的佇列,以更好地在輸出中展示其影響
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }


    static class Producer implements Runnable {
        private BlockingQueue<String> queue;
        public Producer(BlockingQueue<String> q) {
            this.queue = q;
        }

        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                try{
                    Thread.sleep(5L);
                    String msg = "Message" + i;
                    System.out.println("Produced new item: " + msg);
                    queue.put(msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            try {
                System.out.println("Time to say good bye!");
                queue.put(EXIT_MSG);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable{
        private BlockingQueue<String> queue;
        public Consumer(BlockingQueue<String> q){
            this.queue=q;
        }

        @Override
        public void run() {
            try{
                String msg;
                while(!EXIT_MSG.equalsIgnoreCase( (msg = queue.take()))){
                    System.out.println("Consumed item: " + msg);
                    Thread.sleep(10L);
                }
                System.out.println("Got exit message, bye!");
            }catch(InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

上面是一個典型的生產者 - 消費者樣例,如果使用非 Blocking 的佇列,那麼我們就要自己去實現輪詢、條件判斷(如檢查 poll 返回值是否 null)等邏輯,如果沒有特別的場景要求,Blocking 實現起來程式碼更加簡單、直觀。

 

前面介紹了各種佇列實現,在日常的應用開發中,如何進行選擇呢?

以 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 為例,我們一起來分析一下,根據需求可以從很多方面考量:

  •   考慮應用場景中對佇列邊界的要求。ArrayBlockingQueue 是有明確的容量限制的,而 LinkedBlockingQueue 則取決於我們是否在建立時指定,SynchronousQueue 則乾脆不能快取任何元素。
  •   從空間利用角度,陣列結構的 ArrayBlockingQueue 要比 LinkedBlockingQueue 緊湊,因為其不需要建立所謂節點,但是其初始分配階段就需要一段連續的空間,所以初始記憶體需求更大。
  •   通用場景中,LinkedBlockingQueue 的吞吐量一般優於 ArrayBlockingQueue,因為它實現了更加細粒度的鎖操作。
  •   ArrayBlockingQueue 實現比較簡單,效能更好預測,屬於表現穩定的“選手”。
  •   如果我們需要實現的是兩個執行緒之間接力性(handoff)的場景,按照專欄上一講的例子,你可能會選擇 CountDownLatch,但是SynchronousQueue也是完美符合這種場景的,而且執行緒間協調和資料傳輸統一起來,程式碼更加規範。
  •   可能令人意外的是,很多時候 SynchronousQueue 的效能表現,往往大大超過其他實現,尤其是在佇列元素較小的場景。

今天我分析了 Java 中讓人眼花繚亂的各種執行緒安全佇列,試圖從幾個角度,讓每個佇列的特點更加明確,進而希望減少你在日常工作中使用時的困擾。

 

一課一練

關於今天我們討論的題目你做到心中有數了嗎? 今天的內容側重於 Java 自身的角度,面試官也可能從演算法的角度來考察,所以今天留給你的思考題是,指定某種結構,比如棧,用它實現一個 BlockingQueue,實現思路是怎樣的呢?