四、佇列的使用(基於記憶體 和 基於資料庫)
轉載自:https://blog.csdn.net/yang5726685/article/details/54234569
今天跟大家來看看如何在專案中使用佇列。首先我們要知道使用佇列的目的是什麼?一般情況下,如果是一些及時訊息的處理,並且處理時間很短的情況下是不需要使用佇列的,直接阻塞式的方法呼叫就可以了。但是,如果在訊息處理的時候特別費時間,這個時候如果有新的訊息來了,就只能處於阻塞狀態,造成使用者等待。這個時候在專案中引入佇列是十分有必要的。當我們接受到訊息後,先把訊息放到佇列中,然後再用新的執行緒進行處理,這個時候就不會有訊息的阻塞了。下面就跟大家介紹兩種佇列的使用,一種是基於記憶體的,一種是基於資料庫的。
首先,我們來看看基於記憶體的佇列。在Java的併發包中已經提供了BlockingQueue的實現,比較常用的有ArrayBlockingQueue和LinkedBlockingQueue,前者是以陣列的形式儲存,後者是以Node節點的連結串列形式儲存。至於陣列和連結串列的區別這裡就不多說了。
BlockingQueue 佇列常用的操作方法,前面的文章有介紹這裡不在重複。
下面用一個例子來看看是怎麼使用的。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class UserTask { //佇列大小 private final int QUEUE_LENGTH = 10000*10; //基於記憶體的阻塞佇列 private BlockingQueue<String> queue = new LinkedBlockingQueue<String>(QUEUE_LENGTH); //建立計劃任務執行器 private ScheduledExecutorService es = Executors.newScheduledThreadPool(1); /** * 建構函式,執行execute方法 */ public UserTask() { execute(); } /** * 新增資訊至佇列中 * @param content */ public void addQueue(String content) { queue.add(content); } /** * 初始化執行 */ public void execute() { //每一分鐘執行一次 es.scheduleWithFixedDelay(new Runnable(){ public void run() { try { String content = queue.take(); //處理佇列中的資訊。。。。。 System.out.println(content); } catch (InterruptedException e) { e.printStackTrace(); } } }, 0, 1, TimeUnit.MINUTES); } }
以上呢,就是基於記憶體的佇列的介紹,基於記憶體的佇列,佇列的大小依賴於JVM記憶體的大小,一般如果是記憶體佔用不大且處理相對較為及時的都可以採用此種方法。如果你在佇列處理的時候需要有失敗重試機制,那麼用此種佇列就不是特別合適了。下面就說說基於資料庫的佇列。
基於資料庫的佇列,很好理解,就是接收到訊息之後,把訊息存入資料庫中,設定消費時間、重試次數等,再用新的執行緒從資料庫中讀取資訊,進行處理。首先來看看資料庫的設計。
程式碼示例如下:
/** * 批量獲取 可以消費的訊息 * 先使用一個時間戳將被消費的訊息鎖定,然後再使用這個時間戳去查詢鎖定的資料。 * @param count * @return */ public List<Queue> findActiveQueueNew(int count) { //先去更新資料 String locker = String.valueOf(System.currentTimeMillis())+random.nextInt(10000); int lockCount = 0; try { //將status為1的更新為3,設定locker,先鎖定訊息 lockCount = queueDAO.updateActiveQueue(PayConstants.QUEUE_STATUS_LOCKED, PayConstants.QUEUE_STATUS_ACTIVE, count, locker); } catch (Exception e) { logger.error( "QueueDomainRepository.findActiveQueueNew error occured!" + e.getMessage(), e); throw new TuanRuntimeException( PayConstants.SERVICE_DATABASE_FALIURE, "QueueDomainRepository.findActiveQueue error occured!", e); } //如果鎖定的數量為0,則無需再去查詢 if(lockCount == 0){ return null; } //休息一會在再詢,防止資料已經被更改 try { Thread.sleep(1); } catch (Exception e) { logger.error("QueueDomainRepository.findActiveQueue error sleep occured!" + e.getMessage(), e); } List<Queue> activeList = null; try { activeList = queueDAO.getByLocker(locker); } catch (Exception e) { logger.error("QueueDomainRepository.findActiveQueue error occured!" + e.getMessage(), e); throw new TuanRuntimeException( PayConstants.SERVICE_DATABASE_FALIURE, "QueueDomainRepository.findActiveQueue error occured!",e); } return activeList; }
獲取到訊息之後,還需要再判斷訊息是否合法,如是否達到最大消費次數,訊息是否已被成功消費,等,判斷程式碼如下:
/**
* 驗證佇列modle 的合法性
*
* @param model
* @return boolean true,訊息還可以消費。false,訊息不允許消費。
*/
public boolean validateQueue(final QueueModel model){
int consumeCount = model.getConsumeCount();
if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
//消費次數超過了最大次數
return false;
}
int consumeStatus = model.getConsumeStatus();
if(consumeStatus == PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS){
//訊息已經被成功消費
return false;
}
QueueStatusEnum queueStatusEnum = model.getQueueStatusEnum();
if(queueStatusEnum == null || queueStatusEnum != QueueStatusEnum.LOCKED){
//訊息狀態不正確
return false;
}
String jsonData = model.getJsonData();
if(StringUtils.isEmpty(jsonData)){
//訊息體為空
return false;
}
return true;
}
訊息處理完畢之後,根據消費結果修改資料庫中的狀態。
public void consume(boolean isDelete, Long consumeMinTime,
String tradeNo,int consumeCount) {
QueueDO queueDO = new QueueDO();
if (!isDelete) {
//已經到了做大消費次數,訊息作廢 不再處理
if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
//達到最大消費次數的也設定為消費成功
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
queueDO.setStatus(PayConstants.QUEUE_STATUS_CANCEL);
} else {
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_FAILED);
//設定為可用狀態等待下次繼續傳送
queueDO.setStatus(PayConstants.QUEUE_STATUS_ACTIVE);
}
} else {
//第三方消費成功
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
queueDO.setStatus(PayConstants.QUEUE_STATUS_DELETED);
}
queueDO.setNextConsumeTime(consumeMinTime == null ? QueueRuleUtil
.getNextConsumeTime(consumeCount) : consumeMinTime);
if (StringUtils.isNotBlank(tradeNo)) {
queueDO.setTradeNo(tradeNo);
}
long now = System.currentTimeMillis();
queueDO.setUpdateTime(now);
queueDO.setLastConsumeTime(now);
queueDO.setConsumeCount(consumeCount);
queueDO.setQueueID(id);
setQueueDOUpdate(queueDO);
}
下次消費時間的計算如下:根據消費次數計算,每次消費存在遞增的時間間隔。
/**
* 佇列消費 開始時間 控制
*/
public class QueueRuleUtil {
public static long getNextConsumeTime(int consumeCount) {
return getNextConsumeTime(consumeCount, 0);
}
public static long getNextConsumeSecond(int consumeCount) {
return getNextConsumeTime(consumeCount, 0);
}
public static long getNextConsumeTime(int cousumeCount, int addInteval) {
int secends = getNextConsumeSecond(cousumeCount,addInteval);
return System.currentTimeMillis()+secends*1000;
}
public static int getNextConsumeSecond(int cousumeCount, int addInteval) {
if (cousumeCount == 1) {
return addInteval + 10;
} else if (cousumeCount == 2) {
return addInteval + 60;
} else if (cousumeCount == 3) {
return addInteval + 60 * 5;
} else if (cousumeCount == 4) {
return addInteval + 60 * 15;
} else if (cousumeCount == 5) {
return addInteval + 60 * 60;
} else if (cousumeCount == 6){
return addInteval + 60 * 60 *2;
} else if(cousumeCount == 7){
return addInteval + 60 * 60 *5;
} else {
return addInteval + 60 * 60 * 10;
}
}
除此之外,對於消費完成,等待刪除的訊息,可以將訊息直接刪除或者是進行備份。最好不要在該表中保留太多需要刪除的訊息,以免影響資料庫的查詢效率。
我們在處理訊息的時候,首先對訊息進行了鎖定,設定了locker,如果系統出現異常的時候,也會產生訊息一直處於被鎖定的狀態,此時可能還需要定期去修復被鎖定的訊息。
/**
* 批量獲取 可以消費的訊息
*
* @param count
* @return
*/
public void repairQueueByStatus(int status) {
List<QueueDO> activeList = null;
try {
Map<String,Object> params = new HashMap<String,Object>();
params.put("status", status);
//下次消費時間在當前時間3小時以內的訊息
params.put("next_consume_time", System.currentTimeMillis()+3*60*1000);
activeList = queueDAO.findQueueByParams(params);
} catch (Exception e) {
logger.error("QueueDomainRepository.repairQueueByStatus find error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.findQueueByStatus error occured!",e);
}
if (activeList == null || activeList.size() == 0) {
return ;
}
for (QueueDO temp : activeList) {
try {
//status=1,可被消費
queueDAO.update(temp.getQueueID(), PayConstants.QUEUE_STATUS_ACTIVE);
} catch (Exception e) {
logger.error("QueueDomainRepository.repairQueueByStatus update error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.repairQueueByStatus update error occured!",e);
}
}
}
以上就是對兩種佇列的簡單說明。