1. 程式人生 > >JAVA阻塞佇列BlockingQueue

JAVA阻塞佇列BlockingQueue

位於java.util.concurrent下,宣告:public interface BlockingQueue extends Queue

支援兩個附加操作的 Queue,這兩個操作是:獲取元素時等待佇列變為非空,以及儲存元素時等待空間變得可用。

BlockingQueue 方法以四種形式出現,對於不能立即滿足但可能在將來某一時刻可以滿足的操作,這四種形式的處理方式不同:第一種是丟擲一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前執行緒,第四種是在放棄前只在給定的最大時間限制內阻塞。

1. add,offer,put三種新增執行緒到佇列的方法只在佇列滿的時候有區別,add為拋異常,offer返回boolean值,put直到新增成功為止。

2.同理remove,poll, take三種移除佇列中執行緒的方法只在佇列為空的時候有區別, remove為拋異常,poll為返回boolean值, take等待直到有執行緒可以被移除。

看看下面這張圖就清楚了:

BlockingQueue 不接受 null 元素。試圖 addput 或 offer 一個 null 元素時,某些實現會丟擲NullPointerExceptionnull 被用作指示 poll 操作失敗的警戒值。

BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue

 總是報告Integer.MAX_VALUE 的剩餘容量。

BlockingQueue 實現主要用於生產者-使用者佇列,但它另外還支援 Collection 介面。因此,舉例來說,使用 remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。

BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了 c

 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。

BlockingQueue 實質上 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的end-of-stream 或 poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。 

以下是基於典型的生產者-使用者場景的一個用例。注意,BlockingQueue 可以安全地與多個生產者和多個使用者一起使用。

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }
 
 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }
 
 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }