1. 程式人生 > 實用技巧 >第十三章、阻塞佇列

第十三章、阻塞佇列

一、阻塞佇列的由來

  我們假設一種場景,生產者一直生產資源,消費者一直消費資源,資源儲存在一個緩衝池中,生產者將生產的資源存進緩衝池中,消費者從緩衝池中拿到資源進行消費,這就是大名鼎鼎的生產者-消費者模式

  該模式能夠簡化開發過程,一方面消除了生產者類與消費者類之間的程式碼依賴性,另一方面將生產資料的過程與使用資料的過程解耦簡化負載。

  我們自己coding實現這個模式的時候,因為需要讓多個執行緒操作共享變數(即資源),所以很容易引發執行緒安全問題,造成重複消費死鎖,尤其是生產者和消費者存在多個的情況。另外,當緩衝池空了,我們需要阻塞消費者,喚醒生產者;當緩衝池滿了,我們需要阻塞生產者,喚醒消費者,這些個等待-喚醒

邏輯都需要自己實現。

  這麼容易出錯的事情,JDK當然幫我們做啦,這就是阻塞佇列(BlockingQueue),你只管往裡面存、取就行,而不用擔心多執行緒環境下存、取共享變數的執行緒安全問題。

BlockingQueue是Java util.concurrent包下重要的資料結構,區別於普通的佇列,BlockingQueue提供了執行緒安全的佇列訪問方式,併發包下很多高階同步類的實現都是基於BlockingQueue實現的。

BlockingQueue一般用於生產者-消費者模式,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。BlockingQueue就是存放元素的容器

二、BlockingQueue的操作方法

  阻塞佇列提供了四組不同的方法用於插入、移除、檢查元素:

方法\處理方式丟擲異常返回特殊值一直阻塞超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() - -
  • 丟擲異常:如果試圖的操作無法立即執行,拋異常。當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。

  • 返回特殊值:如果試圖的操作無法立即執行,返回一個特殊值,通常是true / false。

  • 一直阻塞:如果試圖的操作無法立即執行,則一直阻塞或者響應中斷。

  • 超時退出:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功,通常是 true / false。

注意之處

  • 不能往阻塞佇列中插入null,會丟擲空指標異常。

  • 可以訪問阻塞佇列中的任意元素,呼叫remove(o)可以將佇列之中的特定物件移除,但並不高效,儘量避免使用。

三、BlockingQueue的實現類

3.1、ArrayBlockingQueue

  由陣列結構組成的有界阻塞佇列。內部結構是陣列,故具有陣列的特性。

public ArrayBlockingQueue(int capacity, boolean fair){
    //..省略程式碼
}

可以初始化佇列大小, 且一旦初始化不能改變。構造方法中的fair表示控制物件的內部鎖是否採用公平鎖,預設是非公平鎖

3.2、LinkedBlockingQueue

  由連結串列結構組成的有界阻塞佇列。內部結構是連結串列,具有連結串列的特性。預設佇列的大小是Integer.MAX_VALUE,也可以指定大小。此佇列按照先進先出的原則對元素進行排序。

3.3、DelayQueue

  該佇列中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素 。注入其中的元素必須實現 java.util.concurrent.Delayed 介面。

  DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。

3.4、PriorityBlockingQueue

基於優先順序的無界阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),內部控制執行緒同步的鎖採用的是非公平鎖。

public PriorityBlockingQueue(int initialCapacity,
                              Comparator<? super E> comparator) {
     this.lock = new ReentrantLock(); //預設構造方法-非公平鎖
     ...//其餘程式碼略
 }

3.5、SynchronousQueue

  這個佇列比較特殊,沒有任何內部容量,甚至連一個佇列的容量都沒有。並且每個 put 必須等待一個 take,反之亦然。

  需要區別容量為1的ArrayBlockingQueue、LinkedBlockingQueue。

以下方法的返回值,可以幫助理解這個佇列:

  • iterator() 永遠返回空,因為裡面沒有東西

  • peek() 永遠返回null

  • put() 往queue放進去一個element以後就一直wait直到有其他thread進來把這個element取走。

  • offer() 往queue裡放一個element後立即返回,如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false。

  • take() 取出並且remove掉queue裡的element,取不到東西他會一直等。

  • poll() 取出並且remove掉queue裡的element,只有到碰巧另外一個執行緒正在往queue裡offer資料或者put資料的時候,該方法才會取到東西。否則立即返回null。

  • isEmpty() 永遠返回true

  • remove()&removeAll() 永遠返回false

注意

  PriorityBlockingQueue不會阻塞資料生產者(因為佇列是無界的),而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。對於使用預設大小的LinkedBlockingQueue也是一樣的。

五、阻塞佇列的原理

  阻塞佇列的原理很簡單,利用了Lock鎖的多條件(Condition)阻塞控制。接下來我們分析ArrayBlockingQueue JDK 1.8 的原始碼。

  首先是構造器,除了初始化佇列的大小和是否是公平鎖之外,還對同一個鎖(lock)初始化了兩個監視器,分別是notEmpty和notFull。這兩個監視器的作用目前可以簡單理解為標記分組,當該執行緒是put操作時,給他加上監視器notFull,標記這個執行緒是一個生產者;當執行緒是take操作時,給他加上監視器notEmpty,標記這個執行緒是消費者。

//資料元素陣列
final Object[] items;
//下一個待取出元素索引
int takeIndex;
//下一個待新增元素索引
int putIndex;
//元素個數
int count;
//內部鎖
final ReentrantLock lock;
//消費者監視器
private final Condition notEmpty;
//生產者監視器
private final Condition notFull;  

public ArrayBlockingQueue(int capacity, boolean fair) {
    //..省略其他程式碼
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

  put操作的原始碼

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.自旋拿鎖
    lock.lockInterruptibly();
    try {
        // 2.判斷佇列是否滿了
        while (count == items.length)
            // 2.1如果滿了,阻塞該執行緒,並標記為notFull執行緒,
            // 等待notFull的喚醒,喚醒之後繼續執行while迴圈。
            notFull.await();
        // 3.如果沒有滿,則進入佇列
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 4 喚醒一個等待的執行緒
    notEmpty.signal();
}

  總結put的流程:

    1. 所有執行put操作的執行緒競爭lock鎖,拿到了lock鎖的執行緒進入下一步,沒有拿到lock鎖的執行緒自旋競爭鎖。

    2. 判斷阻塞佇列是否滿了,如果滿了,則呼叫await方法阻塞這個執行緒,並標記為notFull(生產者)執行緒,同時釋放lock鎖,等待被消費者執行緒喚醒。

    3. 如果沒有滿,則呼叫enqueue方法將元素put進阻塞佇列。注意這一步的執行緒還有一種情況是第二步中阻塞的執行緒被喚醒且又拿到了lock鎖的執行緒。

    4. 喚醒一個標記為notEmpty(消費者)的執行緒。

  take操作的原始碼

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

  take操作和put操作的流程是類似的,總結一下take操作的流程:

    1. 所有執行take操作的執行緒競爭lock鎖,拿到了lock鎖的執行緒進入下一步,沒有拿到lock鎖的執行緒自旋競爭鎖。

    2. 判斷阻塞佇列是否為空,如果是空,則呼叫await方法阻塞這個執行緒,並標記為notEmpty(消費者)執行緒,同時釋放lock鎖,等待被生產者執行緒喚醒。

    3. 如果沒有空,則呼叫dequeue方法。注意這一步的執行緒還有一種情況是第二步中阻塞的執行緒被喚醒且又拿到了lock鎖的執行緒。

    4. 喚醒一個標記為notFull(生產者)的執行緒。

  注意

    1. put和take操作都需要先獲取鎖,沒有獲取到鎖的執行緒會被擋在第一道大門之外自旋拿鎖,直到獲取到鎖。

    2. 就算拿到鎖了之後,也不一定會順利進行put/take操作,需要判斷佇列是否可用(是否滿/空),如果不可用,則會被阻塞,並釋放鎖

    3. 在第2點被阻塞的執行緒會被喚醒,但是在喚醒之後,依然需要拿到鎖才能繼續往下執行,否則,自旋拿鎖,拿到鎖了再while判斷佇列是否可用(這也是為什麼不用if判斷,而使用while判斷的原因)。

六、示例和使用場景

6.1、生產者-消費者模型

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();

        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("從佇列取走一個元素,佇列剩餘"+queue.size()+"個元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Producer extends Thread{

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向佇列取中插入一個元素,佇列剩餘空間:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  下面是這個例子的輸出片段:

從佇列取走一個元素,佇列剩餘0個元素
從佇列取走一個元素,佇列剩餘0個元素
向佇列取中插入一個元素,佇列剩餘空間:9
向佇列取中插入一個元素,佇列剩餘空間:9
向佇列取中插入一個元素,佇列剩餘空間:9
向佇列取中插入一個元素,佇列剩餘空間:8
向佇列取中插入一個元素,佇列剩餘空間:7
向佇列取中插入一個元素,佇列剩餘空間:6
向佇列取中插入一個元素,佇列剩餘空間:5
向佇列取中插入一個元素,佇列剩餘空間:4
向佇列取中插入一個元素,佇列剩餘空間:3
向佇列取中插入一個元素,佇列剩餘空間:2
向佇列取中插入一個元素,佇列剩餘空間:1
向佇列取中插入一個元素,佇列剩餘空間:0
從佇列取走一個元素,佇列剩餘1個元素
從佇列取走一個元素,佇列剩餘9個元素

  注意,這個例子中的輸出結果看起來可能有問題,比如有幾行在插入一個元素之後,佇列的剩餘空間不變。這是由於System.out.println語句沒有鎖。考慮到這樣的情況:執行緒1在執行完put/take操作後立即失去CPU時間片,然後切換到執行緒2執行put/take操作,執行完畢後回到執行緒1的System.out.println語句並輸出,發現這個時候阻塞佇列的size已經被執行緒2改變了,所以這個時候輸出的size並不是當時執行緒1執行完put/take操作之後阻塞佇列的size,但可以確保的是size不會超過10個。實際上使用阻塞佇列是沒有問題的。

6.2、執行緒池中使用阻塞佇列

 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

  Java中的執行緒池就是使用阻塞佇列實現的,我們在瞭解阻塞佇列之後,無論是使用Executors類中已經提供的執行緒池,還是自己通過ThreadPoolExecutor實現執行緒池,都會更加得心應手,想要了解執行緒池的同學,可以看執行緒池原理