阻塞佇列BlockingQueue及其子類的使用
BlockingQueues在java.util.concurrent包下,提供了執行緒安全的佇列訪問方式,當阻塞佇列插入資料時,如果佇列已經滿了,執行緒則會阻塞等待佇列中元素被取出後在插入,當從阻塞佇列中取資料時,如果佇列是空的,則執行緒會阻塞等待佇列中有新元素。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
認識BlockingQueue:
阻塞佇列是一個具有阻塞新增和阻塞刪除功能的佇列,資料結構如下
阻塞新增
所謂的阻塞新增是指當阻塞佇列元素已滿時,佇列會阻塞加入元素的執行緒,直佇列元素不滿時才重新喚醒執行緒執行元素加入操作。阻塞刪除
阻塞刪除是指在佇列元素為空時,刪除佇列元素的執行緒將被阻塞,直到佇列不為空再執行刪除操作(一般都會返回被刪除的元素)資料結構
上圖可以多執行緒環境下,通過佇列很容易實現資料共享, 比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒
這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了,讓我們一起來見識下它的常用方法
BlockingQueue的使用:
阻塞佇列主要用在生產者/消費者的場景,下面這幅圖展示了一個執行緒生產、一個執行緒消費的場景:
負責生產的執行緒不斷的製造新物件並插入到阻塞佇列中,直到達到這個佇列的上限值。佇列達到上限值之後生產執行緒將會被阻塞,直到消費的執行緒對這個佇列進行消費。同理,負責消費的執行緒不斷的從佇列中消費物件,直到這個佇列為空,當佇列為空時,消費執行緒將會被阻塞,除非佇列中有新的物件被插入。
BlockingQueue的核心方法:
-方法\行為 | 拋異常 | 特定的值 | 阻塞 | 超時 |
---|---|---|---|---|
插入方法 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除方法 | poll(),remove(o) | take() | poll(timeout, timeunit) | |
檢查方法 | element() | peek() |
行為解釋:
1.拋異常:如果操作不能馬上進行,則丟擲異常
2. 特定的值:如果操作不能馬上進行,將會返回一個特殊的值,一般是true或者false
3. 阻塞:如果操作不能馬上進行,操作會被阻塞
4. 超時:如果操作不能馬上進行,操作會被阻塞指定的時間,如果指定時間沒執行,則返回一個特殊值,一般是true或者false
插入方法:
- add(E e) : 新增成功返回true,失敗拋IllegalStateException異常
- offer(E e) : 成功返回 true,如果此佇列已滿,則返回 false。
- put(E e) :將元素插入此佇列的尾部,如果該佇列已滿,則一直阻塞
刪除方法:
- remove(Object o) :移除指定元素,成功返回true,失敗返回false
- poll() : 獲取並移除此佇列的頭元素,若佇列為空,則返回 null
- take():獲取並移除此佇列頭元素,若沒有元素則一直阻塞。
檢查方法
- element() :獲取但不移除此佇列的頭元素,沒有元素則拋異常
- peek() :獲取但不移除此佇列的頭;若佇列為空,則返回 null。
BlockingQueue的資料結構:
實現類ArrayBlockingQueue的基本使用:
ArrayBlockingQueue:是一個有邊界的阻塞佇列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。另外它以FIFO先進先出的方式儲存資料,最新插入的物件是尾部,最新移出的物件是頭部
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();
有點需要注意的是ArrayBlockingQueue內部的阻塞佇列是通過重入鎖ReenterLock和Condition條件佇列實現的,所以ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別,對於公平訪問佇列,被阻塞的執行緒可以按照阻塞的先後順序訪問佇列,即先阻塞的執行緒先訪問佇列。而非公平佇列,當佇列可用時,阻塞的執行緒將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的先後順序。建立公平與非公平阻塞佇列程式碼如下:
//預設非公平阻塞佇列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//公平阻塞佇列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
//構造方法原始碼
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
實現類LinkedBlockingQueue的基本使用:
LinkedBlockingQueue:是一個由連結串列實現的有界佇列阻塞佇列,但大小預設值為Integer.MAX_VALUE,如果需要的話,這一鏈式結構可以自定義一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。建議指定佇列大小,預設大小在新增速度大於刪除速度情況下可能造成記憶體溢位,LinkedBlockingQueue佇列也是按 FIFO(先進先出)排序元素
構造方法原始碼:
//預設大小為Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//建立指定大小為capacity的阻塞佇列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//建立大小預設值為Integer.MAX_VALUE的阻塞佇列並新增c中的元素到阻塞佇列
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
ArrayBlockingQueue和LinkedBlockingQueue的區別:
1.佇列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對於後者而言,當新增速度大於移除速度時,在無界的情況下,可能會造成記憶體溢位等問題。
2.資料儲存容器不同,ArrayBlockingQueue採用的是陣列作為資料儲存容器,而LinkedBlockingQueue採用的則是以Node節點作為連線物件的連結串列。
3.由於ArrayBlockingQueue採用的是陣列的儲存容器,因此在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而LinkedBlockingQueue則會生成一個額外的Node物件。這可能在長時間內需要高效併發地處理大批量資料的時,對於GC可能存在較大影響。
4.兩者的實現佇列新增或移除的鎖不一樣,ArrayBlockingQueue實現的佇列中的鎖是沒有分離的,即新增操作和移除操作採用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的佇列中的鎖是分離的,其新增採用的是putLock,移除採用的則是takeLock,這樣能大大提高佇列的吞吐量,也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
DelayQueue
DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。DelayQueue中的元素必須實現 java.util.concurrent.Delayed
介面,這個介面的定義非常簡單:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
getDelay()
方法的返回值就是佇列元素被釋放前的保持時間,如果返回0
或者一個負值
,就意味著該元素已經到期需要被釋放,此時DelayedQueue會通過其take()
方法釋放此物件。
從上面Delayed 介面定義可以看到,它還繼承了Comparable
介面,這是因為DelayedQueue中的元素需要進行排序,一般情況,我們都是按元素過期時間的優先順序進行排序
public class DelayedElement implements Delayed {
private long expired;
private long delay;
private String name;
DelayedElement(String elementName, long delay) {
this. name = elementName;
this. delay= delay;
expired = ( delay + System. currentTimeMillis());
}
@Override
public int compareTo(Delayed o) {
DelayedElement cached=(DelayedElement) o;
return cached.getExpired()> expired?1:-1;
}
@Override
public long getDelay(TimeUnit unit) {
return ( expired - System. currentTimeMillis());
}
@Override
public String toString() {
return "DelayedElement [delay=" + delay + ", name=" + name + "]";
}
public long getExpired() {
return expired;
}
}
設定這個元素的過期時間為3s
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedElement> queue= new DelayQueue<>();
DelayedElement ele= new DelayedElement( "cache 3 seconds",3000);
queue.put( ele);
System. out.println( queue.take());
}
PriorityBlockingQueue佇列
PriorityBlockingQueue是一個沒有邊界的佇列,所以不會阻塞生產者,它的排序規則和 java.util.PriorityQueue
一樣。需要注意,PriorityBlockingQueue中不允許插入null物件。所有插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 介面。因此該佇列中元素的排序就取決於你自己的 Comparable 實現。
SynchronousQueue佇列
SynchronousQueue:SynchronousQueue 是一個特殊的佇列,它的內部同時只能夠容納單個元素。如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。同樣,如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。據此,把這個類稱作一個佇列顯然是誇大其詞了。它更多像是一個匯合點。
拓展:
先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件
在最近的RocketMQ的專案中用到阻塞佇列,程式碼如下:實時監聽MQ訊息,收到訊息,先做簡單 的處理(存入佇列),在開啟執行緒消費訊息(監聽和消費訊息解耦合)
@Component("MQCP_CID_PH_SSP_DEFAULT")
public class MqcpCosumerServiceImpl implements IMqcpCosumerService,InitializingBean,DisposableBean {
private Logger LOGGER =LoggerFactory.getLogger(getClass());
private static MQCPConsumer pushConsumer = null;
private static LinkedBlockingQueue<MQCPMessage> msgQueue;
private static final int QUEUE_MAX_SIZE = 20000;
@Autowired
private ISystemConfigService systemConfigService;
@Autowired
private IMqcpMessageService mqcpMessageService;
public static void setPushConsumer(MQCPConsumer pushConsumer) {
MqcpCosumerServiceImpl.pushConsumer = pushConsumer;
}
public static void setMsgQueue(LinkedBlockingQueue<MQCPMessage> msgQueue) {
MqcpCosumerServiceImpl.msgQueue = msgQueue;
}
public void initMsgQueue(){
setMsgQueue(new LinkedBlockingQueue<MQCPMessage>(QUEUE_MAX_SIZE));
}
public MqcpCosumerServiceImpl() {
initMsgQueue();
}
@Override
public void initMQCPCosumer(){
try {
Properties p = SystemResourceUtil
.getPropertisByName(ResourceFileNameConstants.PROPERTIES_MQCP_CLIENT);
p.setProperty(MQCPConstant.INSTANCE_NAME, systemConfigService.getSysGUID());
setPushConsumer(MQCPFactory.createConsumer(p));
MQCPMessageFilter mqcpFilter = new MQCPMessageFilter();
List<String> list = new ArrayList<String>();
//根據tag 過濾訊息,MQCP只取出傳送訊息時設定了該tag值的訊息
list.add("T_APS_APPL_INFO");
list.add("T_APS_LOAN_AGREEMENT");
list.add("T_APS_EXPENSE_APPL");
list.add("T_APS_LOAN_LOG");
list.add("T_ICORE_CGI_POLICY_INFO");
mqcpFilter.setTags(list);
// 實時監聽訊息
pushConsumer.subscribe(SystemResourceUtil.getPropertiesValueByKey(
ResourceFileNameConstants.PROPERTIES_MQCP_CLIENT, "TOPIC_ID_ILOAN"), mqcpFilter, new MQCPMessageListener() {
@Override
public MQCPConsumeStatus pushMessage(List<MQCPMessage> messageList) {
try {
for (MQCPMessage msg : messageList) {
LOGGER.info(String.format("存證MQCPCosumer監聽到Topic_id:[%s]的訊息key:[%s],準備存入佇列",msg.getTopic(),msg.getKey()));
msgQueue.put(msg);
}
return MQCPConsumeStatus.CONSUME_OK;
} catch (Exception e) {
LOGGER.error("存證MQCPCosumer訊息存入佇列異常", e);
return MQCPConsumeStatus.CONSUME_FAIL;
}
}
});
pushConsumer.start();
LOGGER.info("初始化存證MQCPConsumer訂閱服務:success");
} catch (MQCPException e) {
LOGGER.error("初始化存證MQCPCosumer訂閱服務異常", e);
}
}
@Override
public void shutDownMQCPCosumer() {
try {
if(pushConsumer !=null){
pushConsumer.shutdown();
}
} catch (Exception e) {
LOGGER.error("存證MQCPCosumer關閉異常", e);
}
}
@Override
public void afterPropertiesSet() throws Exception {
initMQCPCosumer();//開啟消費者
new Thread(new InsertTable()).start();
}
@Override
public void destroy() throws Exception {
shutDownMQCPCosumer();
}
class InsertTable implements Runnable{
@Override
public void run() {
while(!Thread.interrupted()){
try {
MQCPMessage msg = msgQueue.take();
LOGGER.info(String.format("存證MQCP訊息入庫執行緒開啟[%s],訊息TAG:[%s]", Thread.currentThread().getName(),msg.getTag()));
mqcpMessageService.insertTable(msg);
} catch (Exception e) {
LOGGER.error("存證MQCP訊息入庫異常",e);
}
}
LOGGER.warn(String.format("存證執行緒已停止[%s]", Thread.currentThread().getName()));
}
}
}