Redis Stream實現訊息佇列
Redis Stream實現訊息佇列
一、stream簡介
Redis Stream 是 Redis 5.0 版本新增加的資料結構。
Redis Stream 主要用於訊息佇列(MQ,Message Queue),Redis 本身是有一個 Redis 釋出訂閱 (pub/sub) 來實現訊息佇列的功能,但它有個缺點就是訊息無法持久化,如果出現網路斷開、Redis 宕機等,訊息就會被丟棄。
簡單來說釋出訂閱 (pub/sub) 可以分發訊息,但無法記錄歷史訊息。
而 Redis Stream 提供了訊息的持久化和主備複製功能,可以讓任何客戶端訪問任何時刻的資料,並且能記住每一個客戶端的訪問位置,還能保證訊息不丟失。
Redis Stream 的結構如下所示,它有一個訊息連結串列,將所有加入的訊息都串起來,每個訊息都有一個唯一的 ID 和對應的內容:
每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加訊息時自動建立。
上圖解析:
- Consumer Group :消費組,使用 XGROUP CREATE 命令建立,一個消費組有多個消費者(Consumer)。
- last_delivered_id :遊標,每個消費組會有個遊標 last_delivered_id,任意一個消費者讀取了訊息都會使遊標last_delivered_id 往前移動。
- pending_ids :消費者(Consumer)的狀態變數,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經被客戶端讀取的訊息,但是還沒有 ack (Acknowledge character:確認字元)。
訊息佇列相關命令:
XADD - 新增訊息到末尾
XTRIM - 對流進行修剪,限制長度
XDEL - 刪除訊息
XLEN - 獲取流包含的元素數量,即訊息長度
XRANGE - 獲取訊息列表,會自動過濾已經刪除的訊息
XREVRANGE - 反向獲取訊息列表,ID 從大到小
XREAD - 以阻塞或非阻塞方式獲取訊息列表
消費者組相關命令:
XGROUP CREATE - 建立消費者組
XREADGROUP GROUP - 讀取消費者組中的訊息
XACK - 將訊息標記為"已處理"
XGROUP SETID - 為消費者組設定新的最後遞送訊息ID
XGROUP DELCONSUMER - 刪除消費者
XGROUP DESTROY - 刪除消費者組
XPENDING - 顯示待處理訊息的相關資訊
XCLAIM - 轉移訊息的歸屬權
XINFO - 檢視流和消費者組的相關資訊;
XINFO GROUPS - 列印消費者組的資訊;
XINFO STREAM - 列印流資訊
二、問題
1.怎麼避免訊息丟失?
為了解決組內訊息讀取但處理期間消費者崩潰帶來的訊息丟失問題,Stream 設計了 Pending 列表,用於記錄讀取但並未處理完畢的訊息。命令 XPENDIING 用來獲消費組或消費內消費者的未處理完畢的訊息
2.Streams 訊息太多了怎麼辦?
-
stream本身有做自動清理的操作,當訊息接近100*100條時,stream會將之前的訊息清除
-
設定stream的上限,超過這個上限的時候會清除多餘的
-
設定定時任務,定時清理stream中的資料,XTRIM命令
3.死信問題
每個Pending的訊息有4個屬性:
訊息ID
所屬消費者
IDLE,已讀取時長
delivery counter,訊息被讀取次數
寫一個定時任務,每5秒讀取一次pending,獲取沒有被ACK的資料,
此時合一獲取到此條訊息的”已讀取時間””訊息被讀取次數”
如果訊息超過60 秒還沒有被消費(可自定義)且訊息被讀取次數為1 ,我們就可以考慮轉組,
如果訊息被讀數為2或者超過2,說明已經轉過組,還沒有被消費,我們就預設有問題,
另傳送訊息通知管理員,把改訊息ACK或者從pending刪除
三、訊息傳送流程
訊息傳送邏輯圖
訊息傳送流程圖
1.pms將原始資料傳送到msg_parse_stream
佇列
2.pms監聽msg_parse_stream
佇列,解析生成訊息
3.pms將完整的訊息傳送到msg_data_stream
佇列
4.notice監聽msg_parse_stream
佇列,將訊息傳送
5.notice將傳送完成的資訊放到msg_record_stream
佇列
6.pms監聽msg_record_stream
佇列,記錄訊息傳送記錄
四、實現
1、基礎配置
1.1、配置Redis
密碼
配置檔案中的引數:requirepass ,就是配置redis訪問密碼的引數;
#預設情況下,是註釋的
requirepass xxxx;
遠端連線
可利用搜索功能 找到
bind 127.0.0.1 -::1
,把這一行註釋掉找到
protected-mode yes
把 yes 改為 no
監聽
notify-keyspace-events AKEx ,設定監聽全部
1.2、匯入jar包
注意:spring boot和fastjson的版本,spring boot版本要2.3.0以上,fastjson版本要1.2.79
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
</parent>
<groupId>com.zm</groupId>
<artifactId>redis-stream</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
</dependencies>
1.3、yaml檔案
spring:
redis:
host: 127.0.0.1
port: 6379
password:
database: 0
lettuce:
pool:
max-active: 8
max-idle: 8
max-wait: -1ms
min-idle: 0
timeout: 5000ms
redisstream:
parse_stream: msg_parse_stream
parse_group_one: msg_parse_group_one
parse_consumer_one: msg_parse_consumer_one
parse_consumer_two: msg_parse_consumer_two
data_stream: msg_data_stream
data_group_one: msg_data_group_one
data_consumer_one: msg_data_consumer_one
data_consumer_two: msg_data_consumer_two
record_stream: msg_record_stream
record_group_one: msg_record_group_one
record_consumer_one: msg_record_consumer_one
record_consumer_two: msg_record_consumer_two
1.4、設定Redis序列化與反序列化
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedisConfig extends CachingConfigurerSupport {
/**
* 操作模板類
*/
@Bean("redisTemplate")
public <T> RedisTemplate<String, T> getRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, T> template = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
//序列化
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(genericFastJsonRedisSerializer);
template.setHashValueSerializer(genericFastJsonRedisSerializer);
template.setDefaultSerializer(genericFastJsonRedisSerializer);
template.setConnectionFactory(redisConnectionFactory);
template.afterPropertiesSet();
return template;
}
}
1.5、獲取配置檔案中的資料
@Data
@Component
@ConfigurationProperties(prefix = "redisstream")
public class RedisStreamConfig {
/**
* 解析訊息流
*/
private String parseStream;
private String parseGroupOne;
private String parseConsumerOne;
private String parseConsumerTwo;
/**
* 訊息資料流
*/
private String dataStream;
private String dataGroupOne;
private String dataConsumerOne;
private String dataConsumerTwo;
/**
* 訊息記錄流
*/
private String recordStream;
private String recordGroupOne;
private String recordConsumerOne;
private String recordConsumerTwo;
}
2、stream配置
下面以訊息解析佇列的建立為例:
2.1、建立相關的監聽類
msg_parse_stream訊息解析佇列主要是用於訊息原始資料的解析,生成訊息
監聽類是有幾個要監聽的stream流,就建立幾個,一般生產者需要兩個,一個監聽訊息解析佇列,一個監聽訊息記錄佇列
例:ListenerMsgParseStream類
這裡類需要實現StreamListener介面,該介面下只有一個要實現的方法——onMessage方法,程式碼:
/**
* @ClassName ListenerMsgParseStream
* @Description 監聽訊息類--監聽 msg_parse_stream流
* @Author wk
* @DATE 2022/4/21 10:35
* @Company 杭州震墨科技有限公司
**/
@Slf4j
@Component
public class ListenerMsgParseStream implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private RedisStreamService<String> redisStreamService;
@Autowired
private RedisStreamConfig redisStreamConfig;
@Autowired
private MsgParseQueueService msgParseQueueService;
@SneakyThrows
@Override
public void onMessage(MapRecord<String, String, String> entries) {
log.info("接受到來自redis的訊息,message_id = {},stream = {},body = {}",entries.getId(),entries.getStream(),entries.getValue());
//解析資料,推送到訊息資料佇列
Boolean parseStatus = msgParseQueueService.parseMsgData(entries.getValue());
if (parseStatus){
// 消費完成後手動確認消費ACK
redisStreamService.ack(entries.getStream(), redisStreamConfig.getParseGroupOne(),entries.getId().getValue());
}
}
}
2.2、將消費者監聽類繫結到相應的stream流上
/**
* @ClassName RedisStreamConfig
* @Description 將消費者監聽類繫結到相應的stream流上
* 生產者繫結 msg_parse_stream流--未解析的訊息
* msg_record_stream流--傳送後的訊息
* @Author wk
* @DATE 2022/4/15 14:27
* @Company 杭州震墨科技有限公司
**/
@Configuration
public class ProducerParseConfig {
@Autowired
private RedisStreamConfig redisStreamConfig;
@Autowired
private RedisStreamService<String> redisStreamService;
@Autowired
private ListenerMsgParseStream listenerMsgParseStream;
@Autowired
private ListenerMsgParseStream2 listenerMsgParseStream2;
/**
* 描述: 構建流讀取請求
*
* @param
* @return org.springframework.data.redis.stream.Subscription
* @author wangke
* @date 2022/4/15 22:27
*/
private StreamMessageListenerContainer.ConsumerStreamReadRequest<String> Construct(String key, String group, String consumerName) {
//初始化stream和group
redisStreamService.initStream(key, group);
//指定消費最新訊息
StreamOffset<String> offset = StreamOffset.create(key, ReadOffset.lastConsumed());
//建立消費者
Consumer consumer = Consumer.from(group, consumerName);
return StreamMessageListenerContainer.StreamReadRequest
.builder(offset)
.errorHandler((error) -> {})
.cancelOnError(e -> false)
.consumer(consumer)
.autoAcknowledge(false) //不自動ACK確認
.build();
}
/**
* 描述: 解析訊息佇列 的訂閱者1
*
* @param
* @return org.springframework.data.redis.stream.Subscription
* @author wangke
* @date 2022/4/15 22:27
*/
@Bean
public Subscription subscriptionWithParseMsg(RedisConnectionFactory factory){
//建立容器
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(5))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
//構建流讀取請求
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerOne());
//將監聽類繫結到相應的stream流上
Subscription subscription = listenerContainer.register(build, listenerMsgParseStream);
//啟動監聽
listenerContainer.start();
return subscription;
}
/**
* 描述: 解析訊息佇列 的訂閱者2
*
* @param
* @return org.springframework.data.redis.stream.Subscription
* @author wangke
* @date 2022/4/15 22:27
*/
@Bean
public Subscription subscriptionWithParseMsg2(RedisConnectionFactory factory){
//建立容器
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
//構建流讀取請求
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerTwo());
//將監聽類繫結到相應的stream流上
Subscription subscription = listenerContainer.register(build, listenerMsgParseStream2);
//啟動監聽
listenerContainer.start();
return subscription;
}
}
2.3、死信問題
2.3.1、判斷
思路:
每5秒獲取一次某個消費者組的沒有ACK的訊息,若訊息已讀時長超過60 秒的且被讀取次數==1,則進行轉組操作,否則通知管理員並手動ACK或者刪除(看需求自己選擇)
@Autowired
private RedisStreamService<String> redisStreamService;
@Autowired
private RedisStreamConfig redisStreamConfig;
@Autowired
private HandleDeadLetter handleDeadLetter;
/**
* 描述: 定時任務
* 每5秒獲取一次msg_data_stream中msg_data_group_one組 pending中沒有ACK的訊息
* 若訊息已讀時長超過60 秒的且被讀取次數==1,則進行轉組操作
* 否則手動ACK並通知管理員
*
* @param
* @return void
* @author wangke
* @date 2022/4/19 16:08
*/
@Scheduled(cron = "0/5 * * * * ?")
public void scanPendingMsg() {
//獲取group中pending訊息,本質上就是執行XPENDING命令
PendingMessagesSummary pendingMessagesSummary = redisStreamService
.readWithPending(redisStreamConfig.getParseStream(),
redisStreamConfig.getParseGroupOne());
//所有pending訊息數量
long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
if (totalPendingMessages == 0) {
return;
}
//生成等待轉組的資料
Map<String, List<RecordId>> consumerRecordIdMap = deadLetter
.waitChangeConsumerMap(pendingMessagesSummary,
redisStreamConfig.getParseStream());
//最後將待轉組的訊息進行轉組
if (!consumerRecordIdMap.isEmpty()) {
deadLetter.changeConsumer(consumerRecordIdMap,redisStreamConfig.getParseStream(), redisStreamConfig.getParseGroupOne());
}
}
DeadLetter類
/**
* @ClassName DeadLetter
* @Description 處理死信問題
* @Author wk
* @DATE 2022/4/19 11:41
* @Company 杭州震墨科技有限公司
**/
@Slf4j
@Component
public class DeadLetter {
private DeadLetter() {
}
private static DeadLetter deadLetter;
static {
deadLetter = new DeadLetter();
}
public static DeadLetter getInstance() {
return deadLetter;
}
@Autowired
private RedisStreamService<String> redisStreamService;
@Autowired
private MsgRecordQueueService msgRecordQueueService;
/**
* 描述: 生成等待轉組的資料
*
* @param pendingMessagesSummary
* @return java.util.Map<java.lang.String, java.util.List < org.springframework.data.redis.connection.stream.RecordId>>
* @author wangke
* @date 2022/4/19 16:37
*/
public Map<String, List<RecordId>> waitChangeConsumerMap(PendingMessagesSummary pendingMessagesSummary, String key) {
//消費者組名稱
String groupName = pendingMessagesSummary.getGroupName();
//pending佇列中最小id
String minMessageId = pendingMessagesSummary.minMessageId();
//pending佇列中最大id
String maxMessageId = pendingMessagesSummary.maxMessageId();
//獲取每個消費者的pending訊息數量
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
//待轉組的訊息
Map<String, List<RecordId>> consumerRecordIdMap = new HashMap<>();
//遍歷每個消費者pending訊息
pendingMessagesPerConsumer.entrySet().forEach(entry -> {
//帶轉組的recordId
List<RecordId> list = new ArrayList<>();
//消費者
String consumer = entry.getKey();
Long consumerPendingMessages = entry.getValue();
log.info("消費者:{},一共有{}條pending訊息", consumer, consumerPendingMessages);
if (consumerPendingMessages > 0) {
//讀取消費者pending佇列前10 條記錄,從id = 0的記錄開始,一直到最大值
PendingMessages pendingMessages = redisStreamService.readWithPending(key, Consumer.from(groupName, consumer));
//遍歷pending詳情
pendingMessages.forEach(message -> {
//訊息的id
RecordId recordId = message.getId();
//訊息已讀時長(訊息從消費組中獲取,到此刻的時間)
Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
//訊息被讀取次數(訊息被獲取的次數)
long deliveryCount = message.getTotalDeliveryCount();
//判斷是否是超過60 秒沒有消費
if (elapsedTimeSinceLastDelivery.getSeconds() > 60) {
//判斷訊息被讀取次數是否為 1次
if (1 == deliveryCount) {
//進行轉組
list.add(recordId);
} else {
//手動確認並記錄異常
log.info("手動ACK訊息,並記錄異常,id={},elapsedTimeSinceLastDelivery={},deliveryCount{}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);
msgRecordQueueService.saveErrorMsgRecord(key,recordId);
redisStreamService.ack(key,groupName,recordId.getValue());
}
}
});
if (list.size() > 0) {
consumerRecordIdMap.put(consumer, list);
}
}
});
return consumerRecordIdMap;
}
/**
* 描述: 對訊息進行轉組
*
* @param consumerRecordIdMap
* @return void
* @author wangke
* @date 2022/4/19 16:12
*/
public void changeConsumer(Map<String, List<RecordId>> consumerRecordIdMap, String key, String group) {
consumerRecordIdMap.entrySet().forEach(entry -> {
//根據當前consumer獲取新的consumer 命令 XINFO CONSUMERS mystream mygroup
String oldConsumer = entry.getKey();
StreamInfo.XInfoConsumers consumers = redisStreamService.getConsumers(key, group);
if (consumers.size()<0){
log.info("轉組失敗:{}組沒有消費者",group);
handleFailureMsg(key,group,entry.getValue());
return;
}
String[] newConsumer = {""};
for (int i = 0; i <consumers.size(); i++) {
if (!oldConsumer.equals(consumers.get(i).consumerName())){
newConsumer[0] = consumers.get(i).consumerName();
break;
}
}
if (newConsumer[0].equals("")){
log.info("轉組失敗:{}組沒有其他消費者",group);
handleFailureMsg(key,group,entry.getValue());
return;
}
List<RecordId> recordIds = entry.getValue();
//轉組
List<ByteRecord> retVal = (List<ByteRecord>) redisStreamService.getStringRedisTemplate().execute(new RedisCallback<List<ByteRecord>>() {
@Override
public List<ByteRecord> doInRedis(RedisConnection redisConnection) throws DataAccessException {
// 相當於執行XCLAIM操作,批量將某一個consumer中的訊息轉到另外一個consumer中
return redisConnection.streamCommands().xClaim(key.getBytes(),
group, newConsumer[0], minIdle(Duration.ofSeconds(10)).ids(recordIds));
}
});
if (retVal.size()>0){
for (ByteRecord byteRecord : retVal) {
log.info("改了訊息的消費者:id={}, value={},newConsumer={}", byteRecord.getId(), byteRecord.getValue(), newConsumer[0]);
}
}
});
}
/**
* 描述:處理轉組失敗的訊息,手動ack
*
* @param
* @return
* @author wangke
* @date 2022/4/22 17:36
*/
private void handleFailureMsg(String key, String group, List<RecordId> recordIds){
for (RecordId recordId : recordIds) {
//記錄並ACK
msgRecordQueueService.saveErrorMsgRecord(key,recordId);
redisStreamService.ack(key,group,recordId.getValue());
}
}
}
2.3.2、處理
思路:
每5秒獲取一次某個消費者組的沒有ACK的訊息,主要消費那些轉組過來的訊息,如果轉組次數大於1,則進行嘗試消費
@Autowired
private RedisStreamService<String> redisStreamService;
@Autowired
private RedisStreamConfig redisStreamConfig;
@Autowired
private HandleDeadLetter handleDeadLetter;
/**
* 描述: 每隔5秒鐘,掃描一下有沒有等待自己消費的
* 主要消費那些轉組過來的訊息,如果轉組次數大於1,則進行嘗試消費
*
* @param
* @return void
* @author wangke
* @date 2022/4/20 14:07
*/
@Scheduled(cron = "0/5 * * * * ?")
public void handleMsg() {
/*從消費者的pending佇列中讀取訊息,能夠進到這裡面的,一定是非業務異常,例如介面超時、伺服器宕機等。
對於業務異常,例如欄位解析失敗等,丟進異常表或者redis*/
PendingMessages pendingMessages = redisStreamService.readWithPending(redisStreamConfig.getParseStream(),
Consumer.from(redisStreamConfig.getParseGroupOne(),
redisStreamConfig.getParseConsumerOne()));
//消費訊息
handleDeadLetter.consumptionMsg(pendingMessages,redisStreamConfig.getParseStream());
}
HandleDeadLetter類
/**
* @ClassName hanld
* @Description 處理
* @Author wk
* @DATE 2022/4/20 13:56
* @Company 杭州震墨科技有限公司
**/
@Slf4j
@Component
public class HandleDeadLetter {
private HandleDeadLetter() {}
private static HandleDeadLetter handleDeadLetter;
static {
handleDeadLetter = new HandleDeadLetter();
}
public static HandleDeadLetter getInstance() {
return handleDeadLetter;
}
@Autowired
private RedisStreamService<String> redisStreamService;
@Autowired
private MsgParseQueueService msgParseQueueService;
@Autowired
private MsgDataQueueService msgDataQueueService;
@Autowired
private MsgRecordQueueService msgRecordQueueService;
/**
* 描述: 消費訊息
* 主要消費那些轉組過來的訊息,如果轉組次數大於1,則進行嘗試消費
*
* @param pendingMessages
* @return void
* @author wangke
* @date 2022/4/20 14:06
*/
public void consumptionMsg(PendingMessages pendingMessages, String key) {
if (pendingMessages.size() > 0) {
pendingMessages.forEach(pendingMessage -> {
// 最後一次消費到現在的間隔
Duration elapsedTimeSinceLastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery();
String groupName = pendingMessage.getGroupName();
String consumerName = pendingMessage.getConsumerName();
// 轉組次數
long totalDeliveryCount = pendingMessage.getTotalDeliveryCount();
// 只消費轉組次數大於1次的
if (totalDeliveryCount > 1) {
try {
RecordId id = pendingMessage.getId();
//獲取訊息列表,會自動過濾已經刪除的訊息
List<MapRecord<String, String, String>> result = redisStreamService.getMsgList(key, Range.rightOpen(id.toString(), id.toString()));
MapRecord<String, String, String> entries = result.get(0);
// 消費訊息
log.info("獲取到轉組的訊息,消費了該訊息id={}, 訊息value={}, 消費者={}", entries.getId(), entries.getValue(),consumerName);
//處理業務
this.handleBusiness(key,entries.getValue());
// 手動ack訊息
redisStreamService.ack(groupName, entries);
} catch (Exception e) {
// 異常處理
e.printStackTrace();
}
}
});
}
}
/**
* 描述: 處理業務
*
* @param key
* @param value
* @return void
* @author wangke
* @date 2022/4/20 17:35
*/
private void handleBusiness(String key, Map<String, String> value) {
//根據key的不同選擇不同的業務進行處理,同監聽類中的業務處理方法
switch (key){
case RedisStreamConstants.MSG_PARSE_STREAM:
msgParseQueueService.saveMsgData(value);
redisStreamService.insertStreamAll(RedisStreamConstants.MSG_DATA_STREAM, value);
break;
case RedisStreamConstants.MSG_DATA_STREAM:
msgDataQueueService.sendMsg(value);
redisStreamService.insertStreamAll(RedisStreamConstants.MSG_RECORD_STREAM, value);
break;
case RedisStreamConstants.MSG_RECORD_STREAM:
msgRecordQueueService.saveMsgRecord(value);
break;
default:
break;
}
}
}
3、相關服務
3.1、BaseQueueService
/**
* @InterfaceName BaseQueueService
* @Description 佇列基礎服務類
* @Author wk
* @DATE 2022/4/21 14:35
* @Company 杭州震墨科技有限公司
**/
public interface BaseQueueService {
/**
* 描述: 從Redis獲取訊息配置資訊
*
* @param key
* @return com.zm.msg.model.MsgConfig
* @author wangke
* @date 2022/4/21 14:38
*/
MsgConfig getMsgConfigByKey(String key);
/**
* 描述: 替換模板中的動態欄位,返回組裝好的訊息內容
*
* @param msg 訊息模板
* @param obj 模板訊息資料實體
* @return
* @throws Exception 反射獲取get,set方法失敗
* @author wangke
* @date 2022/4/21 14:55
*/
String buildContent(String msg, Object obj) throws Exception;
}
3.2、MsgParseQueueService
/**
* @InterfaceName MsgParseQueueService
* @Description 訊息解析佇列服務類--待解析的訊息
* @Author wk
* @DATE 2022/4/21 10:15
* @Company 杭州震墨科技有限公司
**/
public interface MsgParseQueueService {
/**
* 描述: 新增訊息到parse_stream
*
* @param configKey config的key
* @return void
* @author wangke
* @date 2022/4/21 16:19
*/
void saveMsgData(String configKey);
/**
* 描述: 新增訊息到parse_stream
*
* @param value
* @return void
* @author wangke
* @date 2022/4/21 16:19
*/
void saveMsgData(Map<String, String> value);
/**
* 描述: 解析資料,根據模板生成訊息
*
* @param value
* @return 解析成功返回true
* @throws Exception 反射獲取方法失敗
* @author wangke
* @date 2022/4/21 13:53
*/
Boolean parseMsgData(Map<String, String> value) throws Exception;
}
3.3、MsgDataQueueService
/**
* @InterfaceName MsgDataQueueService
* @Description 訊息資料佇列服務類--解析後待發送的訊息
* @Author wk
* @DATE 2022/4/21 10:13
* @Company 杭州震墨科技有限公司
**/
public interface MsgDataQueueService {
/**
* 描述: 新增訊息到date_stream
*
* @param value
* @return void
* @author wangke
* @date 2022/4/21 16:19
*/
void sendMsg(Map<String, String> value);
/**
* 描述: 新增訊息到date_stream
*
* @param model msgConfigKey,msgContent,msgCreateUser,msgSendUser,sendDingDing
* @return void
* @throws Exception 建立訊息引數缺失,未配置訊息模板
* @author wangke
* @date 2022/4/21 16:19
*/
void sendMsg(ContentVO model);
}
3.4、MsgRecordQueueService
/**
* @InterfaceName MsgRecordQueueService
* @Description 訊息記錄佇列服務類--傳送後的訊息
* @Author wk
* @DATE 2022/4/21 10:15
* @Company 杭州震墨科技有限公司
**/
public interface MsgRecordQueueService {
/**
* 描述: 儲存訊息記錄
*
* @param value 訊息
* @return 記錄成功返回true
* @throws Exception 分離使用者id出錯
* @author wangke
* @date 2022/4/22 8:37
*/
Boolean saveMsgRecord(Map<String, String> value);
/**
* 描述: 記錄傳送失敗的訊息
*
* @param key stream_key
* @param recordId stream_id
* @return void
* @author wangke
* @date 2022/4/24 14:38
*/
void saveErrorMsgRecord(String key, RecordId recordId);
}