DelayQueue延遲佇列和Redis快取實現訂單自動取消功能
首先這是一個操作頻繁的自動化定時功能,對比於定時器有著更大的使用空間和效能優化,無論是前端的setTimeout與setInterval 定時器還是後端的TimerTask定時器,在面對短期內的頻繁操作都會有著效能和多執行緒之間的問題,所以這時的佇列就起到很重要的作用了,尤其是在於一些訊息的推送。下面我使用的是DelayQueue延遲佇列和Redis的快取來實現:
1:
(1) 這裡我使用的是maven來管理庫,所以第一步我們是匯入所要實現功能的jar包,DelayQueue包由於jdk的Util包來提供以及我們所需要的Redis的pom依賴,這裡我使用的是StringRedisTemplate來操作redis。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
(2)所有功能都是從底層開始,這裡底層已經由佇列和快取封裝好了,我們只需要編寫業務層和控制層(Rest),需要三個業務層來進行處理所有的邏輯,首先是佇列和快取的業務層,這兩個主要實現的是對訂單儲存和獲取以及刪除。
DelayServer:佇列的業務層,主要用來獲取新增到佇列中的元素以及對佇列的增刪。在其中主要有三個變數,啟動佇列的state,內部介面listener以及佇列集合delatQueue。
@Slf4j @Service @Getter @Setter public class DelayService { private boolean start ; private OnDelayedListener listener; private DelayQueue<DshOrder> delayQueue = new DelayQueue<DshOrder>(); public static interface OnDelayedListener{ public void onDelayedArrived(DshOrder order); } public void start(OnDelayedListener listener){ if(start){ return; } log.error("DelayService 啟動"); start = true; this.listener = listener; new Thread(new Runnable(){ public void run(){ try{ while(true){ DshOrder order = delayQueue.take(); if(DelayService.this.listener != null){ DelayService.this.listener.onDelayedArrived(order); } } }catch(Exception e){ e.printStackTrace(); } } }).start(); } public void add(DshOrder order){ delayQueue.put(order); } public void remove(String orderId){ DshOrder[] array = delayQueue.toArray(new DshOrder[]{}); if(array == null || array.length <= 0){ return; } DshOrder target = null; for(DshOrder order : array){ if(order.getOrderId() == orderId){ target = order; break; } } if(target != null){ delayQueue.remove(target); } } }
DshOrder:訂單佇列中的物件,這裡用來儲存訂單的主鍵以及根據你業務邏輯需要多久取消時間的規格來,我這裡使用的是秒。
@Setter
@Getter
@ApiModel(description = "訂單佇列物件")
public class DshOrder implements Delayed {
@ApiModelProperty(value = "訂單id")
private String orderId;
@ApiModelProperty(value = "超時時間")
private long startTime;
/**
* orderId:訂單id
* timeout:自動取消訂單的超時時間,秒
* */
public DshOrder(String orderId, int timeout){
this.orderId = orderId;
this.startTime = System.currentTimeMillis() + timeout*1000L;
}
@Override
public int compareTo(Delayed other) {
if (other == this){
return 0;
}
if(other instanceof DshOrder){
DshOrder otherRequest = (DshOrder)other;
long otherStartTime = otherRequest.getStartTime();
return (int)(this.startTime - otherStartTime);
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
OrderRedisService:Redis業務層,主要用來將訂單存入快取和便利快取物件到佇列中。
public interface OrderRedisService {
/**
* 訂單物件加入快取
*
* @param orderId 訂單id
* @param orderObject 訂單物件
*/
void saveOrder(String orderId, OrderObject orderObject);
/**
* 獲得快取訂單物件
*
* @param orderId 訂單id
* @return
*/
String getOrder(String orderId);
/**
* 刪除快取訂單物件
*
* @param orderId 訂單id
*/
void deleteOrder(String orderId);
/**
* 查詢所有需要快取的訂單物件
*
* @return
*/
Set<String> sacn();
/**
* 獲得redis鍵的剩餘時間
*
* @param key redis鍵
* @return 剩餘時間
*/
Long getSurplusTime(String key);
}
OrderRedisServiceImpl:Redis業務層實現類,提供訂單在快取中的所有實現。首先儲存訂單時需要將訂單存入快取中並設定過期時間(這是判斷訂單是否超時的關鍵),然後通過id來獲取訂單在快取中是否存在,以及刪除訂單在快取中存在的主鍵,其次是一個遍歷快取中所有關於訂單的主鍵方法,這是一個match匹配,前提是在存入訂單主鍵時給定特定的格式才能匹配到所有存在的主鍵(切記match內的匹配字母,這是你訂單主鍵新增入快取的開頭)。最後,由於快取存在髒資料或者服務掛了的情況使的當訂單主鍵在Redis中過期但是並沒有刪除(Redis的主鍵過期刪除存在自動刪除和手動刪除,所以容易產生髒資料),Redis中提供了TTL命令中的獲取主鍵剩餘過期時間的方法,也就是getExpire。
@Slf4j
@Service
public class OrderRedisServiceImpl implements OrderRedisService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 儲存訂單並設定過期時間
* @param outTradeId
* @param redisDo
*/
@Override
public void saveOrder(String outTradeId, OrderObject redisDo) {
String key = outTradeId;
//key過期時間為30分鐘
redisTemplate.opsForValue().set(key, JsonUtils.obj2Json(redisDo), 600, TimeUnit.SECONDS);
}
/**
* 獲取訂單
* @param outTradeNo
* @return
*/
@Override
public String getOrder(String outTradeNo) {
String key = outTradeNo;
String message = redisTemplate.opsForValue().get(key);
if(message != null){
return key;
}
return "";
}
/**
* 刪除訂單
* @param outTradeNo
*/
@Override
public void deleteOrder(String outTradeNo) {
String key = outTradeNo;
redisTemplate.delete(key);
}
/**
* 獲取訂單中所有的key
* @return
*/
@Override
public Set<String> sacn(){
Set<String> execute = redisTemplate.execute(new RedisCallback<Set<String>>() {
@Override
public Set<String> doInRedis(RedisConnection connection) throws DataAccessException {
Set<String> binaryKeys = new HashSet<>();
Cursor<byte[]> cursor = connection.scan( new ScanOptions.ScanOptionsBuilder().match("order*").count(100).build());
while (cursor.hasNext()) {
binaryKeys.add(new String(cursor.next()));
}
return binaryKeys;
}
});
return execute;
}
@Override
public Long getSurplusTime(String key) {
return redisTemplate.getExpire(key,TimeUnit.SECONDS);
}
}
(3) 佇列和快取的業務層已經寫好了,但是考慮到系統佇列可能會在系統奔潰後刪除了本地的資料,使得服務重啟後資料消失,下面我用了監聽來在系統啟動時,將Redis中的訂單加入到佇列中,但這些的前提都是在Redis的資料沒有被一併清除。
@Slf4j
@Service
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
DelayService delayService;
@Autowired
OrderRedisService redisService;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onApplicationEvent(ContextRefreshedEvent evt) {
log.error(">>>>>>>>>>>>系統啟動完成,onApplicationEvent()");
if (evt.getApplicationContext().getParent() == null) {
return;
}
//自動取消訂單
delayService.start(new DelayService.OnDelayedListener(){
@Override
public void onDelayedArrived(final DshOrder order) {
//非同步來做
ThreadPoolUtils.execute(new Runnable(){
public void run(){
String orderId = order.getOrderId();
//查庫判斷是否需要自動取消訂單
int surpsTime = redisService.getSurplusTime(orderId).intValue();
log.error("redis鍵:" + orderId + ";剩餘過期時間:"+surpsTime);
if(surpsTime > 0){
log.error("沒有需要取消的訂單!");
}else{
log.error("自動取消訂單,刪除佇列:"+orderId);
//從佇列中刪除
delayService.remove(orderId);
//從redis刪除
redisService.deleteOrder(orderId);
log.error("自動取消訂單,刪除redis:"+orderId);
//todo 對訂單進行取消訂單操作
}
}
});
}
});
//查詢需要入隊的訂單
ThreadPoolUtils.execute(new Runnable(){
public void run() {
log.error("查詢需要入隊的訂單");
Set<String> keys = redisService.sacn();
if(keys == null || keys.size() <= 0){
return;
}
log.error("需要入隊的訂單keys:"+keys);
log.error("寫到DelayQueue");
for(String key : keys){
String orderKey = redisService.getOrder(key);
int surpsTime = redisService.getSurplusTime(key).intValue();
log.error("讀redis,key:"+key);
log.error("redis鍵:" + key + ";剩餘過期時間:"+surpsTime);
if(orderKey != null){
DshOrder dshOrder = new DshOrder(orderKey,surpsTime);
delayService.add(dshOrder);
log.error("訂單自動入隊:"+dshOrder);
}
}
}
});
}
}
(4) 接下來便是對訂單的業務層進行最後的操作,也就是在你生成訂單的時候給訂單加入到佇列和快取中去並設定過期時間,這裡我使用的執行緒池來進行操作。
public class ThreadPoolUtils {
private final ExecutorService executor;
private static ThreadPoolUtils instance = new ThreadPoolUtils();
private ThreadPoolUtils() {
this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
}
public static ThreadPoolUtils getInstance() {
return instance;
}
public static <T> Future<T> execute(final Callable<T> runnable) {
return getInstance().executor.submit(runnable);
}
public static Future<?> execute(final Runnable runnable) {
return getInstance().executor.submit(runnable);
}
}
最後便是將訂單加入到快取和佇列。
//把訂單插入到待取消的佇列和redis
ThreadPoolUtils.execute(new Runnable() {
@Override
public void run() {
String itrOrderId = "order" + orderId;
//1 插入到待收貨佇列
DshOrder dshOrder = new DshOrder(itrOrderId, 600);
delayService.add(dshOrder);
log.error("訂單order" + orderId + "入佇列");
//2插入到redis
orderRedisService.saveOrder(itrOrderId, orderObject);
log.error("訂單order" + orderId + "入redis快取");
}
});
還有一點便是在你設定完後,但不需要自動去實現,要去清理快取和佇列中的資料
String delOrderId = "order" + orderId;
int surpsTime = orderRedisService.getSurplusTime(delOrderId).intValue();
log.error("redis鍵:" + delOrderId + ";剩餘過期時間:"+surpsTime);
if (surpsTime <= 0) {
delayService.remove(delOrderId);
log.error("訂單手動出隊:" + delOrderId);
orderRedisService.deleteOrder(delOrderId);
log.error("訂單手動出redis:" + delOrderId);
}
總結:其實在訊息這方面有著很多中介軟體,例如rabbitMQ,activitiMQ,kafka
RabbitMQ,遵循AMQP協議,由內在高併發的erlanng語言開發,用在實時的對可靠性要求比較高的訊息傳遞上。
kafka是Linkedin於2010年12月份開源的訊息釋出訂閱系統,它主要用於處理活躍的流式資料,大資料量的資料處理上,但大多數用於的是日誌。
這讓我想到springcloud和dubbo,前者是spring的產物,後者是阿里巴巴的產物。都在微服務中起到決定性的作用。
所以在對實現功能時的選擇很重要,如果你的系統所處理的資料量不是很大,我覺得佇列和快取很適合你,這樣你可以對訊息的傳遞更加了解,但你使用MQ,kafka的中介軟體時,你會發現使用起來更加輕鬆,但對於資料量大的系統來說,中介軟體是最好的選擇,在這個大資料的時代,高併發,多執行緒,分散式會越來越重要,也是你技術上升的一個很大轉折點。