使用Redis實現輕量級延時佇列
阿新 • • 發佈:2018-12-30
A:需求說明:
- 如果系統中需要用到定時執行計劃的,又不想用到中介軟體,如果輪詢資料庫的話,會導致大量資源消耗,這樣我們就可以使用Redis來實現類似功(需要使用rabbitMQ的請看這裡:https://blog.csdn.net/u010096717/article/details/82148681)
- 業務型別,如訂單一些評論,如果48h使用者未對商家評論,系統會自動產生一條預設評論,還有排隊到時提醒等
B:實現思路:
- 將整個Redis當做訊息池,以kv形式儲存訊息,key為id,value為具體的訊息body
- 使用ZSET做優先佇列,按照score維持優先順序(用當前時間+需要延時的時間作為score)
- 輪詢ZSET,拿出score比當前時間戳大的資料(已過期的)
- 根據id拿到訊息池的具體訊息進行消費
- 消費成功,刪除改佇列和訊息
- 消費失敗,讓該訊息重新回到佇列
C:程式碼實現
-
Message訊息封裝類
@Data public class Message { /** * 訊息id */ private String id; /** * 訊息延遲/毫秒 */ private long delay; /** * 訊息存活時間 */ private int ttl; /** * 訊息體,對應業務內容 */ private String body; /** * 建立時間,如果只有優先順序沒有延遲,可以設定建立時間為0 * 用來消除時間的影響 */ private long createTime; }
2.基於redis的訊息佇列
@Component public class RedisMQ { /** * 訊息池字首,以此字首加上傳遞的訊息id作為key,以訊息{@link Message} * 的訊息體body作為值儲存 */ public static final String MSG_POOL = "Message:Pool:"; /** * zset佇列 名稱 queue */ public static final String QUEUE_NAME = "Message:Queue:"; private static final int SEMIH = 30*60; @Autowired private RedisService redisService; /** * 存入訊息池 * @param message * @return */ public boolean addMsgPool(Message message) { if (null != message) { return redisService.setExp(MSG_POOL + message.getId(), message.getBody(), Long.valueOf(message.getTtl() + SEMIH)); } return false; } /** * 從訊息池中刪除訊息 * @param id * @return */ public void deMsgPool(String id) { redisService.remove(MSG_POOL + id); } /** * 向佇列中新增訊息 * @param key * @param score 優先順序 * @param val * @return 返回訊息id */ public void enMessage(String key, long score, String val) { redisService.zsset(key,val,score); } /** * 從佇列刪除訊息 * @param id * @return */ public boolean deMessage(String key, String id) { return redisService.zdel(key, id); } }
4.編寫訊息傳送(生產者)
@Component
public class MessageProvider {
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
private static int delay = 30;//30秒,可自己動態傳入
@Resource
private RedisMQ redisMQ;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//改造成redis
public void sendMessage(String messageContent) {
try {
if (messageContent != null){
String seqId = UUID.randomUUID().toString();
// 將有效資訊放入訊息佇列和訊息池中
Message message = new Message();
// 可以新增延遲配置
message.setDelay(delay*1000);
message.setCreateTime(System.currentTimeMillis());
message.setBody(messageContent);
message.setId(seqId);
// 設定訊息池ttl,防止長期佔用
message.setTtl(delay + 360);
redisMQ.addMsgPool(message);
//當前時間加上延時的時間,作為score
Long delayTime = message.getCreateTime() + message.getDelay();
String d = sdf.format(message.getCreateTime());
System.out.println("當前時間:" + d+",消費的時間:" + sdf.format(delayTime));
redisMQ.enMessage(RedisMQ.QUEUE_NAME,delayTime, message.getId());
}else {
logger.warn("訊息內容為空!!!!!");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
5.訊息消費者
@Component
public class RedisMQConsumer {
@Resource
private RedisMQ redisMQ;
@Autowired
private RedisService redisService;
@Autowired
private MessageProvider provider;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 訊息佇列監聽器<br>
*
*/
@Scheduled(cron = "*/1 * * * * *")
public void monitor() {
Set<String> set = redisService.rangeByScore(RedisMQ.QUEUE_NAME, 0, System.currentTimeMillis());
if (null != set) {
long current = System.currentTimeMillis();
for (String id : set) {
long score = redisService.getScore(RedisMQ.QUEUE_NAME, id).longValue();
if (current >= score) {
// 已超時的訊息拿出來消費
String str = "";
try {
str = redisService.get(RedisMQ.MSG_POOL + id);
System.out.println("消費了:" + str+ ",消費的時間:" + sdf.format(System.currentTimeMillis()));
} catch (Exception e) {
e.printStackTrace();
//如果出了異常,則重新放回佇列
System.out.println("消費異常,重新回到佇列");
provider.sendMessage(str);
} finally {
redisMQ.deMessage(RedisMQ.QUEUE_NAME, id);
redisMQ.deMsgPool(id);
}
}
}
}
}
}
6.配置資訊
<!--1依賴引入-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2yml配置
spring:
redis:
database: 1
host: 127.0.0.1
port: 6379
以上程式碼已經實現了延遲消費功能,現在來測試一下,呼叫MessageProvider的sendMessage方法,我設定了30秒
可以看到結果
因為我們是用定時器去輪詢的,會出現誤差