阻塞隊列和生產者-消費者模式
何為阻塞隊列,其與普通隊列有何差別?
總的來說,就是能夠在適當的時候阻塞"存"和"取"兩個操作,以達到控制任務流程的效果。阻塞隊列提供了可阻塞的put和take方法。如果隊列已經滿了,那麽put方法將阻塞直到有空間可用;如果隊列為空,那麽take方法將會阻塞直到有元素可用。
阻塞隊列接口及實現來自於Java並發包(java.util.concurrent),常見的實現有LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue
生產者-消費者模式
生產者-消費者模式是非常常見的設計模式。該模式將"找出需要完成的工作"與"執行工作"這兩個過程分離開來,並把工作項放入一個"待完成"列表中以便在隨後處理,而不是找出後立即處理。生產者-消費者模式能簡化開發過程,因為它消除了生產者類與消費者類之間的代碼依賴性,此外,該模式還將生產數據的過程與使用數據的過程解耦開來以簡化工作負載的管理,因為這兩個過程在處理數據的速率上有所不同。
阻塞隊列對於生產者-消費者模式有何裨益?
生產者-消費者模式都是基於隊列的。就說說普通的有界隊列存在的問題吧,隊列存在"滿"和"空"的問題,如果隊列已滿,那生產者繼續往隊列裏存數據就會出問題,存不進去要如何處理,生產者代碼中就要有相應的處理代碼。同樣的,如果隊列為空,消費者取不到數據又要如何反應。而阻塞隊列,就可以在"存不進"和"取不出"的時候,直接阻塞操作,生產者和消費者代碼直接阻塞在存取操作上。當然這種阻塞並不是永久的,就拿生產者來說吧,如果因為"存不進"而阻塞的話,只要消費者取出數據,便會"通知"生產者就能繼續生產並存儲數據。這樣就能極大地簡化生產者-消費者的編碼。
值得一提的是,阻塞隊列還能提供更靈活的選項:offer(對應put)和 poll(對應take)
boolean offer(E e); boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
如果數據項不能被添加到隊列中,將返回一個失敗狀態。而不必一直阻塞下去。這樣你就可以選擇讓生產者做點其他的事。但是一般情況下,如果隊列充滿,很有可能是因為
V生>V消,以至於數據項囤積,如果任其阻塞,則生產者可能被長時間擱置,浪費資源,利用率降低。這時候就要使用一些靈活的策略進行調控,例如減去負載,將多余的工作項序列化並寫入磁盤,減少生產者線程的數量,或者通過某種方式來抑制生產者線程。
使用示例
示例說明:本示例模擬一個生產-消費環境,工廠生產可樂,肥宅消費。這裏對於生產者的調控比較粗暴,直接新建或中斷一個生產者任務。
public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<CocaCola> queue = new ArrayBlockingQueue<>(100); //容量100的隊列 ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { exec.execute(new Producer(queue, exec)); } TimeUnit.SECONDS.sleep(3); //先生產一點庫存 for (int i = 0; i < 5; i++) { exec.execute(new FatIndoorsman(queue, exec)); } } } class CocaCola { //可口可樂 } class Producer implements Runnable { private static int counter = 0; private final int id = counter++; private static List<Producer> producers = new ArrayList<>(); //類管理其實例列表 private Executor exec; private BlockingQueue queue; public Producer(BlockingQueue queue, Executor exec) { this.queue = queue; this.exec = exec; producers.add(this); } public synchronized static void adjust(int flag, BlockingQueue queue, Executor exec) { // 1 添加 -1減少 if (flag == 1) { Producer producer = new Producer(queue, exec); //添加的生產者共享同一個隊列 exec.execute(producer); } else if (flag == -1) { Producer producer = producers.remove(0); producer.cancel(); } } private void cancel() { //利用中斷取消生產任務 Thread.currentThread().interrupt(); } @Override public void run() { while (!Thread.interrupted()) { try { TimeUnit.SECONDS.sleep(1); //模擬生產需耗時1秒 boolean success = queue.offer(new CocaCola()); //通過offer嘗試添加 if (!success) { //如果隊列已滿,則移除1個生產者 System.out.println("remove a producer"); adjust(-1, queue, exec); } System.out.println(this + " produced a coca-cola!"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(this + " is stoped!"); } @Override public String toString() { return "Producer[" + id + "]"; } } class FatIndoorsman implements Runnable { private static int counter = 0; private final int id = counter++; private BlockingQueue queue; private Executor exec; public FatIndoorsman(BlockingQueue queue, Executor exec) { this.queue = queue; this.exec = exec; } @Override public void run() { while (!Thread.interrupted()) { CocaCola cocaCola = (CocaCola) queue.poll(); if (cocaCola != null) { try { TimeUnit.SECONDS.sleep(10); //模擬肥宅每隔10秒要喝一瓶 System.out.println(this + " drink a coca-cola"); } catch (InterruptedException e) { e.printStackTrace(); } } else { Producer.adjust(1, queue, exec); //添加生產者 } } } @Override public String toString() { return "FatIndoorsman[" + id + "]"; } }
阻塞隊列是如何實現的,即如何進行阻塞?
那ArrayBlockingQueue來說,其他實現類實現阻塞的方式應該類似。ArrayBlockingQueue用兩個Condition對象來控制take和put操作的訪問。如果take和put某一方失敗,則調用對應Condition的await方法,進行阻塞。如果某一方成功(即成功調用dequeue或enqueue方法),則釋放信號及時通知對方。
以下是ArrayBlockingQueue.java的部分源碼截圖
阻塞隊列和生產者-消費者模式