1. 程式人生 > >一、BlockingQueue 入門

一、BlockingQueue 入門

1、BlockingQueue繼承關係

java.util.concurrent 包裡的 BlockingQueue是一個介面, 繼承Queue介面,Queue介面繼承 Collection

BlockingQueue----->Queue–>Collection

圖:

佇列的特點是:先進先出(FIFO)

2、BlockingQueue的方法

BlockingQueue 具有 4 組不同的方法用於插入、移除以及對佇列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:
丟擲異常特殊值阻塞超時
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
檢查element()peek()不可用不可用

四組不同的行為方式解釋: 1(異常) 如果試圖的操作無法立即執行,拋一個異常。 2(特定值) 如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。 3(阻塞) 如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。 4(超時) 如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。 不能向BlockingQueue插入一個空物件,否則會丟擲NullPointerException,相應的實現類校驗程式碼

private static void checkNotNull(Object v) {
   if (v == null)
        throw new NullPointerException();
}

BlockingQueue :不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會丟擲 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。

BlockingQueue: 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 總是報告Integer.MAX_VALUE 的剩餘容量。

BlockingQueue :實現主要用於生產者-使用者佇列,但它另外還支援 Collection 介面。因此,舉例來說,使用 remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。

BlockingQueue :實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和removeAll)沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了 c 中的一些元素後,addAll© 有可能失敗(丟擲一個異常)。

BlockingQueue 實質上不 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。

3、BlockingQueue實現類和繼承介面

ArrayBlockingQueue

DelayQueue

LinkedBlockingQueue

PriorityBlockingQueue

SynchronousQueue

繼承他的介面:

public interface BlockingDeque extends BlockingQueue, Deque 1.6新增

public interface TransferQueue extends BlockingQueue 1.7新增

4、BlockingQueue用法

BlockingQueue 通常用於一個執行緒生產物件,而另外一個執行緒消費這些物件的場景。下圖是對這個原理的闡述:

一個執行緒往裡邊放,另外一個執行緒從裡邊取的一個 BlockingQueue。

一個執行緒將會持續生產新物件並將其插入到佇列之中,直到佇列達到它所能容納的臨界點。也就是說,它是有限的。如果該阻塞佇列到達了其臨界點,負責生產的執行緒將會在往裡邊插入新物件時發生阻塞。它會一直處於阻塞之中,直到負責消費的執行緒從佇列中拿走一個物件。

負責消費的執行緒將會一直從該阻塞佇列中拿出物件。如果消費執行緒嘗試去從一個空的佇列中提取物件的話,這個消費執行緒將會處於阻塞之中,直到一個生產執行緒把一個物件丟進佇列。

5、BlockingQueue Example

生產者

import java.util.concurrent.BlockingQueue;
 
public class Producer implements Runnable {
 
    private BlockingQueue<Message> queue;
    
    public Producer(BlockingQueue<Message> q){
        this.queue=q;
    }
    @Override
    public void run() {
        //produce messages
        for(int i=0; i<100; i++){
            Message msg = new Message(""+i);
            try {
                Thread.sleep(i);
                queue.put(msg);
                System.out.println("Produced "+msg.getMsg());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //adding exit message
        Message msg = new Message("exit");
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

消費者

import java.util.concurrent.BlockingQueue;
 
public class Consumer implements Runnable{
 
	private BlockingQueue<Message> queue;
    
    public Consumer(BlockingQueue<Message> q){
        this.queue=q;
    }
 
    @Override
    public void run() {
        try{
            Message msg;
            //consuming messages until exit message is received
            while((msg = queue.take()).getMsg() !="exit"){
            Thread.sleep(10);
            System.out.println("Consumed "+msg.getMsg());
            }
        }catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

測試方法

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}

6、BlockingQueue實現類詳解

1 陣列阻塞佇列 ArrayBlockingQueue

一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素。佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。

這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。

此類支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略。預設情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設定為 true 而構造的佇列允許按照 FIFO 順序訪問執行緒。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”

BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");String string = queue.take();

2 延遲佇列DelayQueue

Delayed 元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此佇列不允許使用 null 元素

3 鏈阻塞佇列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 介面。

LinkedBlockingQueue 內部以一個鏈式結構(連結節點)對其元素進行儲存。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。

LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

BlockingQueue unbounded = new LinkedBlockingQueue();
BlockingQueue bounded   = new LinkedBlockingQueue(1024);bounded.put("Value");
String value = bounded.take();
System.out.println(value);
System.out.println(unbounded.remainingCapacity()==Integer.MAX_VALUE);//true

4 具有優先順序的阻塞佇列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 介面。

一個無界阻塞佇列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞獲取操作。雖然此佇列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗(導致OutOfMemoryError)。此類不允許使用 null 元素。依賴自然順序的優先順序佇列也不允許插入不可比較的物件(這樣做會導致丟擲 ClassCastException)。

此類及其迭代器可以實現 Collection 和 Iterator 介面的所有可選 方法。iterator() 方法中提供的迭代器並不 保證以特定的順序遍歷 PriorityBlockingQueue 的元素。如果需要有序地進行遍歷,則應考慮使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按優先順序順序移除 全部或部分元素,並將它們放在另一個 collection 中。

在此類上進行的操作不保證具有同等優先順序的元素的順序。如果需要實施某一排序,那麼可以定義自定義類或者比較器,比較器可使用修改鍵斷開主優先順序值之間的聯絡。例如,以下是應用先進先出 (first-in-first-out) 規則斷開可比較元素之間聯絡的一個類。要使用該類,則需要插入一個新的 FIFOEntry(anEntry) 來替換普通的條目物件。

5 同步佇列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 介面。

SynchronousQueue 是一個特殊的佇列,它的內部同時只能夠容納單個元素。如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。同樣,如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。

據此,把這個類稱作一個佇列顯然是誇大其詞了。它更多像是一個匯合點。

6 阻塞雙端佇列 BlockingDeque

java.util.concurrent 包裡的 BlockingDeque 介面表示一個執行緒安放入和提取例項的雙端佇列。本小節我將給你演示如何使用 BlockingDeque。

BlockingDeque 類是一個雙端佇列,在不能夠插入元素時,它將阻塞住試圖插入元素的執行緒;在不能夠抽取元素時,它將阻塞住試圖抽取的執行緒。

deque(雙端佇列) 是 “Double Ended Queue” 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。

7 鏈阻塞雙端佇列 LinkedBlockingDeque

一個基於已連結節點的、任選範圍的阻塞雙端佇列。

可選的容量範圍構造方法引數是一種防止過度膨脹的方式。如果未指定容量,那麼容量將等於 Integer.MAX_VALUE。只要插入元素不會使雙端佇列超出容量,每次插入後都將動態地建立連結節點。

大多數操作都以固定時間執行(不計阻塞消耗的時間)。異常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 以及批量操作,它們均以線性時間執行。