Java知識總結----佇列的使用(八)
今天跟大家來看看如何在專案中使用佇列。首先我們要知道使用佇列的目的是什麼?一般情況下,如果是一些及時訊息的處理,並且處理時間很短的情況下是不需要使用佇列的,直接阻塞式的方法呼叫就可以了。但是,如果在訊息處理的時候特別費時間,這個時候如果有新的訊息來了,就只能處於阻塞狀態,造成使用者等待。這個時候在專案中引入佇列是十分有必要的。當我們接受到訊息後,先把訊息放到佇列中,然後再用新的執行緒進行處理,這個時候就不會有訊息的阻塞了。下面就跟大家介紹兩種佇列的使用,一種是基於記憶體的,一種是基於資料庫的。
首先,我們來看看基於記憶體的佇列。在Java的併發包中已經提供了BlockingQueue的實現,比較常用的有ArrayBlockingQueue和LinkedBlockingQueue,前者是以陣列的形式儲存,後者是以Node節點的連結串列形式儲存。至於陣列和連結串列的區別這裡就不多說了。
BlockingQueue 佇列常用的操作方法:
1.往佇列中新增元素: add(), put(), offer()
2.從佇列中取出或者刪除元素: remove() element() peek() pool() take()
每個方法的說明如下:
offer()方法往佇列新增元素如果佇列已滿直接返回false,佇列未滿則直接插入並返回true;
add()方法是對offer()方法的簡單封裝.如果佇列已滿,丟擲異常new IllegalStateException("Queue full");
put()方法往佇列裡插入元素,如果佇列已經滿,則會一直等待直到佇列為空插入新元素,或者執行緒被中斷丟擲異常.
remove()方法直接刪除隊頭的元素:
peek()方法直接取出隊頭的元素,並不刪除.
element()方法對peek方法進行簡單封裝,如果隊頭元素存在則取出並不刪除,如果不存在丟擲異常NoSuchElementException()
pool()方法取出並刪除隊頭的元素,當佇列為空,返回null;
take()方法取出並刪除隊頭的元素,當佇列為空,則會一直等待直到佇列有新元素可以取出,或者執行緒被中斷丟擲異常
offer()方法一般跟pool()方法相對應, put()方法一般跟take()方法相對應.日常開發過程中offer()與pool()方法用的相對比較頻繁.
下面用一個例子來看看是怎麼使用的。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class UserTask {
//佇列大小
private final int QUEUE_LENGTH = 10000*10;
//基於記憶體的阻塞佇列
private BlockingQueue<String> queue = new LinkedBlockingQueue<String>(QUEUE_LENGTH);
//建立計劃任務執行器
private ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
/**
* 建構函式,執行execute方法
*/
public UserTask() {
execute();
}
/**
* 新增資訊至佇列中
* @param content
*/
public void addQueue(String content) {
queue.add(content);
}
/**
* 初始化執行
*/
public void execute() {
//每一分鐘執行一次
es.scheduleWithFixedDelay(new Runnable(){
public void run() {
try {
String content = queue.take();
//處理佇列中的資訊。。。。。
System.out.println(content);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 1, TimeUnit.MINUTES);
}
}
以上呢,就是基於記憶體的佇列的介紹,基於記憶體的佇列,佇列的大小依賴於JVM記憶體的大小,一般如果是記憶體佔用不大且處理相對較為及時的都可以採用此種方法。如果你在佇列處理的時候需要有失敗重試機制,那麼用此種佇列就不是特別合適了。下面就說說基於資料庫的佇列。
基於資料庫的佇列,很好理解,就是接收到訊息之後,把訊息存入資料庫中,設定消費時間、重試次數等,再用新的執行緒從資料庫中讀取資訊,進行處理。首先來看看資料庫的設計。
欄位 | 型別 | 說明 |
queue_id | bigint | 佇列ID,唯一標識 |
create_time | bigint | 建立時間 |
type | int | 業務型別 |
status | int | 處理狀態位 : 1:有效可處理(active) 3:臨時被佔用 (locked) 5:處理完畢 標記刪除(deleted) |
consume_status | int | 消費狀態:1:未消費 2:消費成功 3:消費失敗,等待下次消費 4:作廢 |
update_time | bigint | 更新時間 |
locker | varchar | 佔用標籤 |
last_consume_time | bigint | 最後一次消費時間 |
next_consume_time | bigint | 可消費開始時間 |
consume_count | int | 消費次數 |
json_data | text | 資料資訊 json格式 |
程式碼示例如下:
/**
* 批量獲取 可以消費的訊息
* 先使用一個時間戳將被消費的訊息鎖定,然後再使用這個時間戳去查詢鎖定的資料。
* @param count
* @return
*/
public List<Queue> findActiveQueueNew(int count) {
//先去更新資料
String locker = String.valueOf(System.currentTimeMillis())+random.nextInt(10000);
int lockCount = 0;
try {
//將status為1的更新為3,設定locker,先鎖定訊息
lockCount = queueDAO.updateActiveQueue(PayConstants.QUEUE_STATUS_LOCKED,
PayConstants.QUEUE_STATUS_ACTIVE, count, locker);
} catch (Exception e) {
logger.error(
"QueueDomainRepository.findActiveQueueNew error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.findActiveQueue error occured!", e);
}
//如果鎖定的數量為0,則無需再去查詢
if(lockCount == 0){
return null;
}
//休息一會在再詢,防止資料已經被更改
try {
Thread.sleep(1);
} catch (Exception e) {
logger.error("QueueDomainRepository.findActiveQueue error sleep occured!"
+ e.getMessage(), e);
}
List<Queue> activeList = null;
try {
activeList = queueDAO.getByLocker(locker);
} catch (Exception e) {
logger.error("QueueDomainRepository.findActiveQueue error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.findActiveQueue error occured!",e);
}
return activeList;
}
獲取到訊息之後,還需要再判斷訊息是否合法,如是否達到最大消費次數,訊息是否已被成功消費,等,判斷程式碼如下:
/**
* 驗證佇列modle 的合法性
*
* @param model
* @return boolean true,訊息還可以消費。false,訊息不允許消費。
*/
public boolean validateQueue(final QueueModel model){
int consumeCount = model.getConsumeCount();
if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
//消費次數超過了最大次數
return false;
}
int consumeStatus = model.getConsumeStatus();
if(consumeStatus == PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS){
//訊息已經被成功消費
return false;
}
QueueStatusEnum queueStatusEnum = model.getQueueStatusEnum();
if(queueStatusEnum == null || queueStatusEnum != QueueStatusEnum.LOCKED){
//訊息狀態不正確
return false;
}
String jsonData = model.getJsonData();
if(StringUtils.isEmpty(jsonData)){
//訊息體為空
return false;
}
return true;
}
訊息處理完畢之後,根據消費結果修改資料庫中的狀態。
public void consume(boolean isDelete, Long consumeMinTime,
String tradeNo,int consumeCount) {
QueueDO queueDO = new QueueDO();
if (!isDelete) {
//已經到了做大消費次數,訊息作廢 不再處理
if (consumeCount >= PayConstants.QUEUE_MAX_CONSUME_COUNT) {
//達到最大消費次數的也設定為消費成功
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
queueDO.setStatus(PayConstants.QUEUE_STATUS_CANCEL);
} else {
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_FAILED);
//設定為可用狀態等待下次繼續傳送
queueDO.setStatus(PayConstants.QUEUE_STATUS_ACTIVE);
}
} else {
//第三方消費成功
queueDO.setConsumeStatus(PayConstants.QUEUE_STATUS_CONSUMER_SUCCESS);
queueDO.setStatus(PayConstants.QUEUE_STATUS_DELETED);
}
queueDO.setNextConsumeTime(consumeMinTime == null ? QueueRuleUtil
.getNextConsumeTime(consumeCount) : consumeMinTime);
if (StringUtils.isNotBlank(tradeNo)) {
queueDO.setTradeNo(tradeNo);
}
long now = System.currentTimeMillis();
queueDO.setUpdateTime(now);
queueDO.setLastConsumeTime(now);
queueDO.setConsumeCount(consumeCount);
queueDO.setQueueID(id);
setQueueDOUpdate(queueDO);
}
下次消費時間的計算如下:根據消費次數計算,每次消費存在遞增的時間間隔。
/**
* 佇列消費 開始時間 控制
*/
public class QueueRuleUtil {
public static long getNextConsumeTime(int consumeCount) {
return getNextConsumeTime(consumeCount, 0);
}
public static long getNextConsumeSecond(int consumeCount) {
return getNextConsumeTime(consumeCount, 0);
}
public static long getNextConsumeTime(int cousumeCount, int addInteval) {
int secends = getNextConsumeSecond(cousumeCount,addInteval);
return System.currentTimeMillis()+secends*1000;
}
public static int getNextConsumeSecond(int cousumeCount, int addInteval) {
if (cousumeCount == 1) {
return addInteval + 10;
} else if (cousumeCount == 2) {
return addInteval + 60;
} else if (cousumeCount == 3) {
return addInteval + 60 * 5;
} else if (cousumeCount == 4) {
return addInteval + 60 * 15;
} else if (cousumeCount == 5) {
return addInteval + 60 * 60;
} else if (cousumeCount == 6){
return addInteval + 60 * 60 *2;
} else if(cousumeCount == 7){
return addInteval + 60 * 60 *5;
} else {
return addInteval + 60 * 60 * 10;
}
}
除此之外,對於消費完成,等待刪除的訊息,可以將訊息直接刪除或者是進行備份。最好不要在該表中保留太多需要刪除的訊息,以免影響資料庫的查詢效率。
我們在處理訊息的時候,首先對訊息進行了鎖定,設定了locker,如果系統出現異常的時候,也會產生訊息一直處於被鎖定的狀態,此時可能還需要定期去修復被鎖定的訊息。
/**
* 批量獲取 可以消費的訊息
*
* @param count
* @return
*/
public void repairQueueByStatus(int status) {
List<QueueDO> activeList = null;
try {
Map<String,Object> params = new HashMap<String,Object>();
params.put("status", status);
//下次消費時間在當前時間3小時以內的訊息
params.put("next_consume_time", System.currentTimeMillis()+3*60*1000);
activeList = queueDAO.findQueueByParams(params);
} catch (Exception e) {
logger.error("QueueDomainRepository.repairQueueByStatus find error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.findQueueByStatus error occured!",e);
}
if (activeList == null || activeList.size() == 0) {
return ;
}
for (QueueDO temp : activeList) {
try {
//status=1,可被消費
queueDAO.update(temp.getQueueID(), PayConstants.QUEUE_STATUS_ACTIVE);
} catch (Exception e) {
logger.error("QueueDomainRepository.repairQueueByStatus update error occured!"
+ e.getMessage(), e);
throw new TuanRuntimeException(
PayConstants.SERVICE_DATABASE_FALIURE,
"QueueDomainRepository.repairQueueByStatus update error occured!",e);
}
}
}
以上就是對兩種佇列的簡單說明。在使用基於資料庫的佇列的時候,其中還使用到了事件處理機制,這部分的內容,就下次的時候再去介紹。