阻塞佇列概念及其簡單使用
什麼是阻塞佇列
概念
當佇列滿的時候,插入元素的執行緒被阻塞,直到佇列不滿
佇列為空的時候,獲取元素的執行緒被阻塞,直到佇列不為空
生產者消費者模式也是阻塞佇列的一種體現
常用阻塞佇列
ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列
LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列
PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列
DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列
SynchronousQueue:一個不儲存元素的阻塞佇列
LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列
LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列
常用方法
方法 | 丟擲異常 | 返回值 | 一直阻塞 | 超時退出 |
插入方法 | add | offer | put | offer(timeout) |
移除方法 | remove | poll | take | poll(timeout) |
檢查方法 | element | peek | N/A | N/A |
介紹:
ArrayBlockingQueue:按照先進先出的原則,初始化需要設定大小
LinkedBlockingQueue:按照先進先出的原則,可以不設定初始大小,不設定,預設就會Integer.MAX_VALUE
區別:
鎖:ArrayBlockingQueue,只使用了一把鎖,而LinkedBlockingQueue使用了2把
實現:ArrayBlockingQueue直接插入元素,LinkedBlockingQueue需要轉換
初始化:ArrayBlockingQueue必須指定初始化大小,LinkedBlockingQueue可以不指定
PriorityBlockingQueue:預設採用自然順序排序,就1<2,A>B... ,如果想自定義順序有要麼實現CompateTo方法,要麼指定構造引數Comparator,如果一致,PriorityBlockingQueue不保證優先順序順序
DelayQueue:支援延時獲取元素的阻塞佇列,內部使用PriorityBlockingQueue,元素必須實現Delayed接口才允許放入
SynchronousQueue:每一個put操作,都要等待一個take操作,類似於等待喚醒機制
LinkedTransferQueue:相比於LinkedBlockingQueue多了兩個方法,transfer和tryTransfer,transfer在往佇列裡面插入元素之前,先看一下有沒有消費者在等著,如果有,直接把元素交給消費者,省去了插入和,取出的步驟,tryTransfer嘗試把元素給消費者,無論消費者是否接收,都會立即返回,transfer必須要消費者消費之後,才會返回
LinkedBlockingDeque:可以從佇列的頭部和尾部都可以插入和移除元素,可以在有競爭的時候從兩側獲取元素,減少一半的時間,在ForkJoin中的工作密取機制就是採用的LinkedBlockingDeque實現的,凡是方法名帶了First的都是從頭去拿,帶了Last都是從尾部拿,不加的話,預設add等於addLast,remove等於removeFirst,take方法等於takeFirst
建議:儘量採用有界阻塞佇列,因為在流量高峰的時候,無界阻塞佇列會不斷的增加佔用資源,可能導致伺服器宕機
案例:
使用DelayQueue實現延時訂單功能
定義元素容器類
package org.dance.day5.bq; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 存放到佇列的元素 * @author ZYGisComputer */ public class ItemVo<T> implements Delayed { /** * 延時時間 單位為毫秒 */ private long activeTime; private T data; public ItemVo(long activeTime, T data) { // 將時間轉換為納秒 並 + 當前納秒 = 將超時時長轉換為超時時刻 this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,TimeUnit.MILLISECONDS) + System.nanoTime(); this.data = data; } /** * 返回元素的剩餘時間 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS); } /** * 按照剩餘時間排序 * @param o * @return */ @Override public int compareTo(Delayed o) { long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (d==0)?0:(d>0)?1:-1; } public long getActiveTime() { return activeTime; } public T getData() { return data; } }
定義訂單實體類
package org.dance.day5.bq; /** * 訂單實體類 * @author ZYGisComputer */ public class Order { private final String orderNo; private final double orderMoney; public Order(String orderNo, double orderMoney) { this.orderNo = orderNo; this.orderMoney = orderMoney; } public String getOrderNo() { return orderNo; } public double getOrderMoney() { return orderMoney; } }
定義訂單放入執行緒
package org.dance.day5.bq; import java.util.concurrent.DelayQueue; /** * 將訂單放入佇列 * @author ZYGisComputer */ public class PutOrder implements Runnable{ private DelayQueue<ItemVo<Order>> queue; public PutOrder(DelayQueue<ItemVo<Order>> queue) { this.queue = queue; } @Override public void run() { // 5秒到期 Order order = new Order("1",16.00d); // 包裝成ItemVo ItemVo<Order> itemVo = new ItemVo<>(5000,order); queue.offer(itemVo); System.out.println("訂單5秒後到期:"+order.getOrderNo()); // 5秒到期 order = new Order("2",18.00d); // 包裝成ItemVo itemVo = new ItemVo<>(8000,order); queue.offer(itemVo); System.out.println("訂單8秒後到期:"+order.getOrderNo()); } }
定義訂單消費執行緒
package org.dance.day5.bq; import java.util.concurrent.DelayQueue; /** * 獲取訂單 * * @author ZYGisComputer */ public class FetchOrder implements Runnable { private DelayQueue<ItemVo<Order>> queue; public FetchOrder(DelayQueue<ItemVo<Order>> queue) { this.queue = queue; } @Override public void run() { while (true) { ItemVo<Order> poll = null; try { poll = queue.take(); Order data = poll.getData(); String orderNo = data.getOrderNo(); System.out.println(orderNo + "已經消費"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } } }
測試類
package org.dance.day5.bq; import java.util.Queue; import java.util.concurrent.DelayQueue; /** * @author ZYGisComputer */ public class Test { public static void main(String[] args) throws InterruptedException { DelayQueue<ItemVo<Order>> queue = new DelayQueue<>(); new Thread(new PutOrder(queue)).start(); new Thread(new FetchOrder(queue)).start(); for (int i = 0; i < 15; i++) { Thread.sleep(500); System.out.println(i*500); } } }
執行結果
訂單5秒後到期:1 訂單8秒後到期:2 0 500 1000 1500 2000 2500 3000 3500 4000 4500 1已經消費 5000 5500 6000 6500 7000 2已經消費
通過執行結果分析,可以得知,兩個放入阻塞佇列的元素已經成功被消費掉了,當然這種阻塞佇列也可以用於別的場景,比如實現本地延時快取,限時繳費....
作者:彼岸舞
時間:2021\01\11
內容關於:併發程式設計
本文來源於網路,只做技術分享,一概不負任何責任