執行緒安全佇列Queue
使用非阻塞佇列的時候有一個很大問題就是:它不會對當前執行緒產生阻塞,那麼在面對類似消費者-生產者的模型時,就必須額外地實現同步策略以及執行緒間喚醒策略,這個實現起來就非常麻煩。但是有了阻塞佇列就不一樣了,它會對當前執行緒產生阻塞,比如一個執行緒從一個空的阻塞佇列中取元素,此時執行緒會被阻塞直到阻塞佇列中有了元素。當佇列中有元素後,被阻塞的執行緒會自動被喚醒(不需要我們編寫程式碼去喚醒)。這樣提供了極大的方便性。
具體說明:
java阻塞佇列應用於生產者消費者模式、訊息傳遞、並行任務執行和相關併發設計的大多數常見使用上下文。
BlockingQueue在Queue介面基礎上提供了額外的兩種型別的操作,分別是獲取元素時等待佇列變為非空和新增元素時等待空間變為可用。
BlockingQueue定義的常用方法如下:
丟擲異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e,time, unit)
移除 remove() poll() take() poll(time,unit)
檢查 element() peek() 不可用 不可用
插入操作是指向佇列中新增一個元素,至於元素存放的位置與具體佇列的實現有關。移除操作將會移除佇列的頭部元素,並將這個移除的元素作為返回值反饋給呼叫者。檢查操作是指返回佇列的頭元素給呼叫者,佇列不對這個頭元素進行刪除處理。
丟擲異常形式的操作,在佇列已滿的情況下,呼叫add方法將會丟擲IllegalStateException異常。如果呼叫remove方法時,佇列已經為空,則丟擲一個NoSuchElementException異常。(實際上,remove方法還可以附帶一個引數,用來刪除佇列中的指定元素,如果這個元素不存在,也會丟擲NoSuchElementException異常)。如果呼叫element檢查頭元素,佇列為空時,將會丟擲NoSuchElementException異常。
特殊值操作與丟擲異常不同,在出錯的時候,返回一個空指標,而不會丟擲異常。
阻塞形式的操作,呼叫put方法時,如果佇列已滿,則呼叫執行緒阻塞等待其它執行緒從佇列中取出元素。呼叫take方法時,如果阻塞佇列已經為空,則呼叫執行緒阻塞等待其它執行緒向佇列新增新元素。
LinkedBlockingQueue:
基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。
阻塞佇列:執行緒安全
按FIFO(先進先出)排序元素。佇列的頭部是在佇列中時間最長的元素。佇列的尾部是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列檢索操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。
注意:
1、必須要使用take()方法在獲取的時候達成阻塞結果。
2、使用poll()方法將產生非阻塞效果。
然後看看兩個關鍵方法的實現:put()和take():
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
從put方法的實現可以看出,它先獲取了鎖,並且獲取的是可中斷鎖,然後判斷當前元素個數是否等於陣列的長度,如果相等,則呼叫notFull.await()進行等待。
當被其他執行緒喚醒時,通過enqueue (e)方法插入元素,最後解鎖。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
它是一個private方法,插入成功後,通過notEmpty喚醒正在等待取元素的執行緒。
下面是take()方法的實現:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
跟
put
方法實現很類似,只不過
put
方法等待的是
notFull
訊號,而
take
方法等待的是
notEmpty
訊號。在
take
方法中,如果可以取元素,則通過
dequeue
方法取得元素,下面是
dequeue
方法的實現:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
跟enqueue方法也很類似。
其實從這裡應該明白了阻塞佇列的實現原理,事實它和我們用Object.wait()、Object.notify()和非阻塞佇列實現生產者-消費者的思路類似,
只不過它把這些工作一起整合到了阻塞佇列中實現。
在專案中應用:
第一步:在專案啟動時(呼叫init方法)建立一個單執行緒,用來處理任務的下發
package com.hikvision.energy.worker;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.hikvision.energy.constant.InspectionConst;
import com.hikvision.energy.core.util.obj.ObjectUtils;
import com.hikvision.energy.util.CfgMgr;
import com.hikvision.energy.util.RESTTemplateUtil;
import com.hikvision.energy.vo.InspectionJobVO;
/**
* 巡檢任務下發worker
*
* @author wanjiadong
* @date 建立時間:2017年11月22日 下午7:02:53
* @version 1.0
* @parameter
* @since
* @return
*/
public class InspectionJobWorker {
//日誌
private static Logger log = LoggerFactory.getLogger(InspectionJobWorker.class);
//易變的
private static volatile boolean stop = false;
//連線時間
private static final int connetionTimeOut = 5000;
//傳遞資料時間
private static final int socketTimeOut = 10000;
//預設size為Integer.MAX_VALUE 2*31 -1,也可指定大小
//這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了
private static BlockingQueue<InspectionJobUnit> queue = new LinkedBlockingQueue<InspectionJobUnit>();
public void init(){
new InspectionJobThread().start();
}
static class InspectionJobThread extends Thread {
public void run(){
log.info("InspectionJobWorker start!");
InspectionJobUnit inspectionJobUnit = null;
OperationEnum methodType = null;
while(!stop){
try {
inspectionJobUnit = queue.take();
methodType = inspectionJobUnit.getMethodType();
//下發巡檢任務url
String addUrl = "http://" +CfgMgr.getInspectionComponentsIp()+ ":" +CfgMgr.getInspectionComponentsPort()+ InspectionConst.INSPECTION_JOB_ADD_URL;
//刪除url
String delUrl = "http://" +CfgMgr.getInspectionComponentsIp()+ ":" +CfgMgr.getInspectionComponentsPort()+ InspectionConst.INSPECTION_JOB_DELETE_URL;
switch(methodType){
case ADD:
InspectionJobVO inspectionJobVO = inspectionJobUnit.getInspectionJobVO();
//下發給巡檢元件巡檢任務
JSONObject resp = RESTTemplateUtil.doPOSTHttpByJSON(addUrl, JSONObject.toJSONString(inspectionJobVO), JSONObject.class, connetionTimeOut, socketTimeOut);
//resultCode為0代表下發成功
if(ObjectUtils.isObjectNull(resp) || ObjectUtils.isObjectNull(resp.get("resultCode"))
|| !resp.get("resultCode").equals(InspectionConst.CONSTANT_ZERO)){
log.error("巡檢任務下發失敗,任務id為:"+inspectionJobVO.getJobId());
}
break;
case MODIFY:
/**
* 先刪除,再新增
*/
String delJsonString = "{\"jobId\": "+inspectionJobUnit.getJobId()+"}";
//下發給巡檢元件巡檢任務
JSONObject delResp = RESTTemplateUtil.doPOSTHttpByJSON(delUrl, delJsonString, JSONObject.class, connetionTimeOut, socketTimeOut);
//resultCode為0代表下發成功
if(ObjectUtils.isObjectNotNull(delResp) && ObjectUtils.isObjectNotNull(delResp.get("resultCode"))
&& delResp.get("resultCode").equals(InspectionConst.CONSTANT_ZERO)){
InspectionJobVO jobVO = inspectionJobUnit.getInspectionJobVO();
//下發給巡檢元件巡檢任務
JSONObject addResp = RESTTemplateUtil.doPOSTHttpByJSON(addUrl, JSONObject.toJSONString(jobVO), JSONObject.class, connetionTimeOut, socketTimeOut);
//resultCode為0代表下發成功
if(ObjectUtils.isObjectNull(addResp) || ObjectUtils.isObjectNull(addResp.get("resultCode"))
|| !addResp.get("resultCode").equals(InspectionConst.CONSTANT_ZERO)){
log.error("更新巡檢任務失敗。巡檢任務下發失敗,任務id為:"+jobVO.getJobId());
}
} else {
log.error("更新巡檢任務失敗。刪除jobId:"+inspectionJobUnit.getJobId());
}
break;
case DELETE:
String delString = "";
if(ObjectUtils.isObjectNull(inspectionJobUnit.getJobItemId())){
delString = "{\"jobId\": "+inspectionJobUnit.getJobId()+"}";
} else {
delString = "{\"jobId\": "+inspectionJobUnit.getJobId()+",\"jobItemId\":"+inspectionJobUnit.getJobItemId()+"}";
}
//下發給巡檢元件巡檢任務
JSONObject delResponse = RESTTemplateUtil.doPOSTHttpByJSON(delUrl, delString, JSONObject.class, connetionTimeOut, socketTimeOut);
if(ObjectUtils.isObjectNotNull(delResponse) || ObjectUtils.isObjectNotNull(delResponse.get("resultCode"))
|| !delResponse.get("resultCode").equals(InspectionConst.CONSTANT_ZERO)){
log.error("刪除巡檢任務失敗。刪除jobId:"+inspectionJobUnit.getJobId());
}
break;
default:break;
}
} catch (InterruptedException e) {
log.error("InspectionJobThread error" + e.getMessage());
} catch (Exception ex){
log.error("InspectionJobThread doPOSTHttpByJSON error" + ex.getMessage());
}
}
}
}
/**
* 新增需要下發的巡檢任務
*
* @author wanjiadong
* @date 建立時間:2017年11月22日 下午7:59:57
* @param inspectionJobVO
* @throws InterruptedException
*/
public static void buildAdd(final InspectionJobVO inspectionJobVO) throws InterruptedException{
InspectionJobUnit inspectionJobUnit = new InspectionJobUnit(inspectionJobVO, OperationEnum.ADD);
if(ObjectUtils.isObjectNotNull(inspectionJobUnit)){
queue.put(inspectionJobUnit);
}
}
/**
* 新增需要更新的巡檢任務
*
* @author wanjiadong
* @date 建立時間:2017年11月22日 下午8:04:37
* @param inspectionJobVO
* @param jobId
* @throws InterruptedException
*/
public static void buildModify(final InspectionJobVO inspectionJobVO, final Long jobId) throws InterruptedException{
InspectionJobUnit inspectionJobUnit = new InspectionJobUnit(inspectionJobVO, OperationEnum.MODIFY, jobId);
if(ObjectUtils.isObjectNotNull(inspectionJobUnit)){
queue.put(inspectionJobUnit);
}
}
/**
* 新增需要刪除的巡檢任務id
* jobItemId不為空,則刪除巡檢
*
* @author wanjiadong
* @date 建立時間:2017年11月30日 下午6:46:12
* @param jobId
* @throws InterruptedException
*/
public static void buildDelete(final Long jobId, final Long jobItemId) throws InterruptedException{
InspectionJobUnit inspectionJobUnit = new InspectionJobUnit(null, OperationEnum.DELETE, jobId, jobItemId);
if(ObjectUtils.isObjectNotNull(inspectionJobUnit)){
queue.put(inspectionJobUnit);
}
}
/**
* 清空佇列
*
* @author wanjiadong
* @date 建立時間:2017年11月23日 下午4:42:50
*/
public static void clearQueue(){
queue.clear();
}
}