1. 程式人生 > 其它 >Redis Stream實現訊息佇列

Redis Stream實現訊息佇列

Redis Stream實現訊息佇列

一、stream簡介

Redis Stream 是 Redis 5.0 版本新增加的資料結構。

Redis Stream 主要用於訊息佇列(MQ,Message Queue),Redis 本身是有一個 Redis 釋出訂閱 (pub/sub) 來實現訊息佇列的功能,但它有個缺點就是訊息無法持久化,如果出現網路斷開、Redis 宕機等,訊息就會被丟棄。

簡單來說釋出訂閱 (pub/sub) 可以分發訊息,但無法記錄歷史訊息。

而 Redis Stream 提供了訊息的持久化和主備複製功能,可以讓任何客戶端訪問任何時刻的資料,並且能記住每一個客戶端的訪問位置,還能保證訊息不丟失。

Redis Stream 的結構如下所示,它有一個訊息連結串列,將所有加入的訊息都串起來,每個訊息都有一個唯一的 ID 和對應的內容:

每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加訊息時自動建立。

上圖解析:

  • Consumer Group :消費組,使用 XGROUP CREATE 命令建立,一個消費組有多個消費者(Consumer)。
  • last_delivered_id :遊標,每個消費組會有個遊標 last_delivered_id,任意一個消費者讀取了訊息都會使遊標last_delivered_id 往前移動。
  • pending_ids :消費者(Consumer)的狀態變數,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經被客戶端讀取的訊息,但是還沒有 ack (Acknowledge character:確認字元)。

訊息佇列相關命令:

XADD - 新增訊息到末尾
XTRIM - 對流進行修剪,限制長度
XDEL - 刪除訊息
XLEN - 獲取流包含的元素數量,即訊息長度
XRANGE - 獲取訊息列表,會自動過濾已經刪除的訊息
XREVRANGE - 反向獲取訊息列表,ID 從大到小
XREAD - 以阻塞或非阻塞方式獲取訊息列表

消費者組相關命令:

XGROUP CREATE - 建立消費者組
XREADGROUP GROUP - 讀取消費者組中的訊息
XACK - 將訊息標記為"已處理"
XGROUP SETID - 為消費者組設定新的最後遞送訊息ID
XGROUP DELCONSUMER - 刪除消費者
XGROUP DESTROY - 刪除消費者組
XPENDING - 顯示待處理訊息的相關資訊
XCLAIM - 轉移訊息的歸屬權
XINFO - 檢視流和消費者組的相關資訊;
XINFO GROUPS - 列印消費者組的資訊;
XINFO STREAM - 列印流資訊

二、問題

1.怎麼避免訊息丟失?

為了解決組內訊息讀取但處理期間消費者崩潰帶來的訊息丟失問題,Stream 設計了 Pending 列表,用於記錄讀取但並未處理完畢的訊息。命令 XPENDIING 用來獲消費組或消費內消費者的未處理完畢的訊息

2.Streams 訊息太多了怎麼辦?

  • stream本身有做自動清理的操作,當訊息接近100*100條時,stream會將之前的訊息清除

  • 設定stream的上限,超過這個上限的時候會清除多餘的

  • 設定定時任務,定時清理stream中的資料,XTRIM命令

3.死信問題

每個Pending的訊息有4個屬性:

  • 訊息ID

  • 所屬消費者

  • IDLE,已讀取時長

  • delivery counter,訊息被讀取次數

寫一個定時任務,每5秒讀取一次pending,獲取沒有被ACK的資料,

此時合一獲取到此條訊息的”已讀取時間””訊息被讀取次數”

如果訊息超過60 秒還沒有被消費(可自定義)且訊息被讀取次數為1 ,我們就可以考慮轉組,

如果訊息被讀數為2或者超過2,說明已經轉過組,還沒有被消費,我們就預設有問題,

另傳送訊息通知管理員,把改訊息ACK或者從pending刪除

三、訊息傳送流程

訊息傳送邏輯圖

訊息傳送流程圖

1.pms將原始資料傳送到msg_parse_stream佇列

2.pms監聽msg_parse_stream佇列,解析生成訊息

3.pms將完整的訊息傳送到msg_data_stream佇列

4.notice監聽msg_parse_stream佇列,將訊息傳送

5.notice將傳送完成的資訊放到msg_record_stream佇列

6.pms監聽msg_record_stream佇列,記錄訊息傳送記錄

四、實現

1、基礎配置

1.1、配置Redis

密碼

配置檔案中的引數:requirepass ,就是配置redis訪問密碼的引數;

#預設情況下,是註釋的

requirepass xxxx;

遠端連線

可利用搜索功能 找到 bind 127.0.0.1 -::1 ,把這一行註釋掉

找到 protected-mode yes 把 yes 改為 no

監聽

notify-keyspace-events AKEx ,設定監聽全部

1.2、匯入jar包

注意:spring boot和fastjson的版本,spring boot版本要2.3.0以上,fastjson版本要1.2.79


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
    </parent>

    <groupId>com.zm</groupId>
    <artifactId>redis-stream</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.79</version>
        </dependency>
    </dependencies>

1.3、yaml檔案
spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        max-wait: -1ms
        min-idle: 0
    timeout: 5000ms
    
 redisstream:
  parse_stream: msg_parse_stream
  parse_group_one: msg_parse_group_one
  parse_consumer_one: msg_parse_consumer_one
  parse_consumer_two: msg_parse_consumer_two
  data_stream: msg_data_stream
  data_group_one: msg_data_group_one
  data_consumer_one: msg_data_consumer_one
  data_consumer_two: msg_data_consumer_two
  record_stream: msg_record_stream
  record_group_one: msg_record_group_one
  record_consumer_one: msg_record_consumer_one
  record_consumer_two: msg_record_consumer_two   

1.4、設定Redis序列化與反序列化
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedisConfig extends CachingConfigurerSupport {

    /**
     * 操作模板類
     */
    @Bean("redisTemplate")
    public <T> RedisTemplate<String, T> getRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, T> template = new RedisTemplate<>();

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
        //序列化
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(genericFastJsonRedisSerializer);
        template.setHashValueSerializer(genericFastJsonRedisSerializer);
        template.setDefaultSerializer(genericFastJsonRedisSerializer);
        template.setConnectionFactory(redisConnectionFactory);
        template.afterPropertiesSet();
        return template;
    }

}
1.5、獲取配置檔案中的資料
@Data
@Component
@ConfigurationProperties(prefix = "redisstream")
public class RedisStreamConfig {
    /**
     * 解析訊息流
     */
    private String parseStream;
    private String parseGroupOne;
    private String parseConsumerOne;
    private String parseConsumerTwo;
    /**
     * 訊息資料流
     */
    private String dataStream;
    private String dataGroupOne;
    private String dataConsumerOne;
    private String dataConsumerTwo;
    /**
     * 訊息記錄流
     */
    private String recordStream;
    private String recordGroupOne;
    private String recordConsumerOne;
    private String recordConsumerTwo;
    

}

2、stream配置

下面以訊息解析佇列的建立為例:

2.1、建立相關的監聽類

msg_parse_stream訊息解析佇列主要是用於訊息原始資料的解析,生成訊息

監聽類是有幾個要監聽的stream流,就建立幾個,一般生產者需要兩個,一個監聽訊息解析佇列,一個監聽訊息記錄佇列

例:ListenerMsgParseStream類

這裡類需要實現StreamListener介面,該介面下只有一個要實現的方法——onMessage方法,程式碼:

/**
 * @ClassName ListenerMsgParseStream
 * @Description 監聽訊息類--監聽 msg_parse_stream流
 * @Author wk
 * @DATE 2022/4/21 10:35
 * @Company 杭州震墨科技有限公司
 **/
@Slf4j
@Component
public class ListenerMsgParseStream implements StreamListener<String, MapRecord<String, String, String>> {

    @Autowired
    private RedisStreamService<String> redisStreamService;
    @Autowired
    private RedisStreamConfig redisStreamConfig;

    @Autowired
    private MsgParseQueueService msgParseQueueService;


    @SneakyThrows
    @Override
    public void onMessage(MapRecord<String, String, String> entries) {
        log.info("接受到來自redis的訊息,message_id = {},stream = {},body = {}",entries.getId(),entries.getStream(),entries.getValue());
        //解析資料,推送到訊息資料佇列
        Boolean parseStatus = msgParseQueueService.parseMsgData(entries.getValue());
        if (parseStatus){
            // 消費完成後手動確認消費ACK
            redisStreamService.ack(entries.getStream(), redisStreamConfig.getParseGroupOne(),entries.getId().getValue());
        }

    }
}
2.2、將消費者監聽類繫結到相應的stream流上
/**
 * @ClassName RedisStreamConfig
 * @Description 將消費者監聽類繫結到相應的stream流上
 * 生產者繫結 msg_parse_stream流--未解析的訊息
 * msg_record_stream流--傳送後的訊息
 * @Author wk
 * @DATE 2022/4/15 14:27
 * @Company 杭州震墨科技有限公司
 **/
@Configuration
public class ProducerParseConfig {

    @Autowired
    private RedisStreamConfig redisStreamConfig;

    @Autowired
    private RedisStreamService<String> redisStreamService;

    @Autowired
    private ListenerMsgParseStream listenerMsgParseStream;
    @Autowired
    private ListenerMsgParseStream2 listenerMsgParseStream2;



    /**
     * 描述: 構建流讀取請求
     *
     * @param
     * @return org.springframework.data.redis.stream.Subscription
     * @author wangke
     * @date 2022/4/15 22:27
     */
    private StreamMessageListenerContainer.ConsumerStreamReadRequest<String> Construct(String key, String group, String consumerName) {
        //初始化stream和group
        redisStreamService.initStream(key, group);
        //指定消費最新訊息
        StreamOffset<String> offset = StreamOffset.create(key, ReadOffset.lastConsumed());
        //建立消費者
        Consumer consumer = Consumer.from(group, consumerName);

        return StreamMessageListenerContainer.StreamReadRequest
                .builder(offset)
                .errorHandler((error) -> {})
                .cancelOnError(e -> false)
                .consumer(consumer)
                .autoAcknowledge(false) //不自動ACK確認
                .build();
    }

    /**
     * 描述: 解析訊息佇列 的訂閱者1
     *
     * @param
     * @return org.springframework.data.redis.stream.Subscription
     * @author wangke
     * @date   2022/4/15 22:27
     */
    @Bean
    public Subscription subscriptionWithParseMsg(RedisConnectionFactory factory){

        //建立容器
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(5))
                .build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
        //構建流讀取請求
        StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerOne());
        //將監聽類繫結到相應的stream流上
        Subscription subscription = listenerContainer.register(build, listenerMsgParseStream);
        //啟動監聽
        listenerContainer.start();

        return subscription;
    }


    /**
     * 描述: 解析訊息佇列 的訂閱者2
     *
     * @param
     * @return org.springframework.data.redis.stream.Subscription
     * @author wangke
     * @date   2022/4/15 22:27
     */
    @Bean
    public Subscription subscriptionWithParseMsg2(RedisConnectionFactory factory){
        //建立容器
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
        //構建流讀取請求
        StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerTwo());
        //將監聽類繫結到相應的stream流上
        Subscription subscription = listenerContainer.register(build, listenerMsgParseStream2);
        //啟動監聽
        listenerContainer.start();

        return subscription;
    }

}
2.3、死信問題
2.3.1、判斷

思路:

每5秒獲取一次某個消費者組的沒有ACK的訊息,若訊息已讀時長超過60 秒的且被讀取次數==1,則進行轉組操作,否則通知管理員並手動ACK或者刪除(看需求自己選擇)

    @Autowired
    private RedisStreamService<String> redisStreamService;
    @Autowired
    private RedisStreamConfig redisStreamConfig;
    @Autowired
    private HandleDeadLetter handleDeadLetter;

/**
 * 描述: 定時任務
 * 每5秒獲取一次msg_data_stream中msg_data_group_one組 pending中沒有ACK的訊息
 * 若訊息已讀時長超過60 秒的且被讀取次數==1,則進行轉組操作
 * 否則手動ACK並通知管理員
 *
 * @param
 * @return void
 * @author wangke
 * @date 2022/4/19 16:08
 */
@Scheduled(cron = "0/5 * * * * ?")
public void scanPendingMsg() {
    //獲取group中pending訊息,本質上就是執行XPENDING命令
    PendingMessagesSummary pendingMessagesSummary = redisStreamService
            .readWithPending(redisStreamConfig.getParseStream(),
            redisStreamConfig.getParseGroupOne());
    //所有pending訊息數量
    long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
    if (totalPendingMessages == 0) {
        return;
    }
    //生成等待轉組的資料
    Map<String, List<RecordId>> consumerRecordIdMap = deadLetter
            .waitChangeConsumerMap(pendingMessagesSummary,
            redisStreamConfig.getParseStream());
    //最後將待轉組的訊息進行轉組
    if (!consumerRecordIdMap.isEmpty()) {
        deadLetter.changeConsumer(consumerRecordIdMap,redisStreamConfig.getParseStream(), redisStreamConfig.getParseGroupOne());
    }

}

DeadLetter類

/**
 * @ClassName DeadLetter
 * @Description 處理死信問題
 * @Author wk
 * @DATE 2022/4/19 11:41
 * @Company 杭州震墨科技有限公司
 **/
@Slf4j
@Component
public class DeadLetter {

    private DeadLetter() {
    }

    private static DeadLetter deadLetter;

    static {
        deadLetter = new DeadLetter();
    }

    public static DeadLetter getInstance() {
        return deadLetter;
    }

    @Autowired
    private RedisStreamService<String> redisStreamService;

    @Autowired
    private MsgRecordQueueService msgRecordQueueService;

    /**
     * 描述: 生成等待轉組的資料
     *
     * @param pendingMessagesSummary
     * @return java.util.Map<java.lang.String, java.util.List < org.springframework.data.redis.connection.stream.RecordId>>
     * @author wangke
     * @date 2022/4/19 16:37
     */
    public Map<String, List<RecordId>> waitChangeConsumerMap(PendingMessagesSummary pendingMessagesSummary, String key) {

        //消費者組名稱
        String groupName = pendingMessagesSummary.getGroupName();
        //pending佇列中最小id
        String minMessageId = pendingMessagesSummary.minMessageId();
        //pending佇列中最大id
        String maxMessageId = pendingMessagesSummary.maxMessageId();
        //獲取每個消費者的pending訊息數量
        Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
        //待轉組的訊息
        Map<String, List<RecordId>> consumerRecordIdMap = new HashMap<>();
        //遍歷每個消費者pending訊息
        pendingMessagesPerConsumer.entrySet().forEach(entry -> {
            //帶轉組的recordId
            List<RecordId> list = new ArrayList<>();
            //消費者
            String consumer = entry.getKey();
            Long consumerPendingMessages = entry.getValue();
            log.info("消費者:{},一共有{}條pending訊息", consumer, consumerPendingMessages);
            if (consumerPendingMessages > 0) {
                //讀取消費者pending佇列前10 條記錄,從id = 0的記錄開始,一直到最大值
                PendingMessages pendingMessages = redisStreamService.readWithPending(key, Consumer.from(groupName, consumer));
                //遍歷pending詳情
                pendingMessages.forEach(message -> {
                    //訊息的id
                    RecordId recordId = message.getId();
                    //訊息已讀時長(訊息從消費組中獲取,到此刻的時間)
                    Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
                    //訊息被讀取次數(訊息被獲取的次數)
                    long deliveryCount = message.getTotalDeliveryCount();
                    //判斷是否是超過60 秒沒有消費
                    if (elapsedTimeSinceLastDelivery.getSeconds() > 60) {
                        //判斷訊息被讀取次數是否為 1次
                        if (1 == deliveryCount) {
                            //進行轉組
                            list.add(recordId);
                        } else {
                            //手動確認並記錄異常
                            log.info("手動ACK訊息,並記錄異常,id={},elapsedTimeSinceLastDelivery={},deliveryCount{}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);
                            msgRecordQueueService.saveErrorMsgRecord(key,recordId);
                            redisStreamService.ack(key,groupName,recordId.getValue());
                        }
                    }
                });
                if (list.size() > 0) {
                    consumerRecordIdMap.put(consumer, list);
                }
            }
        });

        return consumerRecordIdMap;

    }

    /**
     * 描述: 對訊息進行轉組
     *
     * @param consumerRecordIdMap
     * @return void
     * @author wangke
     * @date 2022/4/19 16:12
     */
    public void changeConsumer(Map<String, List<RecordId>> consumerRecordIdMap, String key, String group) {
        consumerRecordIdMap.entrySet().forEach(entry -> {
            //根據當前consumer獲取新的consumer  命令 XINFO CONSUMERS mystream mygroup
            String oldConsumer = entry.getKey();
            StreamInfo.XInfoConsumers consumers = redisStreamService.getConsumers(key, group);
            if (consumers.size()<0){
                log.info("轉組失敗:{}組沒有消費者",group);
                handleFailureMsg(key,group,entry.getValue());
                return;
            }
            String[] newConsumer = {""};

            for (int i = 0; i <consumers.size(); i++) {
                if (!oldConsumer.equals(consumers.get(i).consumerName())){
                    newConsumer[0] = consumers.get(i).consumerName();
                    break;
                }
            }
            if (newConsumer[0].equals("")){
                log.info("轉組失敗:{}組沒有其他消費者",group);
                handleFailureMsg(key,group,entry.getValue());
                return;
            }
            List<RecordId> recordIds = entry.getValue();
            //轉組
            List<ByteRecord> retVal = (List<ByteRecord>) redisStreamService.getStringRedisTemplate().execute(new RedisCallback<List<ByteRecord>>() {
                @Override
                public List<ByteRecord> doInRedis(RedisConnection redisConnection) throws DataAccessException {
                    // 相當於執行XCLAIM操作,批量將某一個consumer中的訊息轉到另外一個consumer中
                    return redisConnection.streamCommands().xClaim(key.getBytes(),
                            group, newConsumer[0], minIdle(Duration.ofSeconds(10)).ids(recordIds));
                }

            });
            if (retVal.size()>0){
                for (ByteRecord byteRecord : retVal) {
                    log.info("改了訊息的消費者:id={}, value={},newConsumer={}", byteRecord.getId(), byteRecord.getValue(), newConsumer[0]);
                }
            }

        });

    }

    /**
     * 描述:處理轉組失敗的訊息,手動ack
     *
     * @param
     * @return
     * @author wangke
     * @date   2022/4/22 17:36
     */
    private void handleFailureMsg(String key, String group, List<RecordId> recordIds){

        for (RecordId recordId : recordIds) {
            //記錄並ACK
            msgRecordQueueService.saveErrorMsgRecord(key,recordId);
            redisStreamService.ack(key,group,recordId.getValue());
        }
    }

}

2.3.2、處理

思路:

每5秒獲取一次某個消費者組的沒有ACK的訊息,主要消費那些轉組過來的訊息,如果轉組次數大於1,則進行嘗試消費

    @Autowired
    private RedisStreamService<String> redisStreamService;
    @Autowired
    private RedisStreamConfig redisStreamConfig;
    @Autowired
    private HandleDeadLetter handleDeadLetter;

/**
 * 描述:  每隔5秒鐘,掃描一下有沒有等待自己消費的
 *       主要消費那些轉組過來的訊息,如果轉組次數大於1,則進行嘗試消費
 *
 * @param
 * @return void
 * @author wangke
 * @date   2022/4/20 14:07
 */
@Scheduled(cron = "0/5 * * * * ?")
public void handleMsg() {
    /*從消費者的pending佇列中讀取訊息,能夠進到這裡面的,一定是非業務異常,例如介面超時、伺服器宕機等。
    對於業務異常,例如欄位解析失敗等,丟進異常表或者redis*/
    PendingMessages pendingMessages = redisStreamService.readWithPending(redisStreamConfig.getParseStream(),
            Consumer.from(redisStreamConfig.getParseGroupOne(),
                    redisStreamConfig.getParseConsumerOne()));
    //消費訊息
    handleDeadLetter.consumptionMsg(pendingMessages,redisStreamConfig.getParseStream());
}

HandleDeadLetter類

/**
 * @ClassName hanld
 * @Description 處理
 * @Author wk
 * @DATE 2022/4/20 13:56
 * @Company 杭州震墨科技有限公司
 **/
@Slf4j
@Component
public class HandleDeadLetter {

    private HandleDeadLetter() {}

    private static HandleDeadLetter handleDeadLetter;

    static {
        handleDeadLetter = new HandleDeadLetter();
    }

    public static HandleDeadLetter getInstance() {
        return handleDeadLetter;
    }

    @Autowired
    private RedisStreamService<String> redisStreamService;
    @Autowired
    private MsgParseQueueService msgParseQueueService;
    @Autowired
    private MsgDataQueueService msgDataQueueService;
    @Autowired
    private MsgRecordQueueService msgRecordQueueService;


    /**
     * 描述:  消費訊息
     *    主要消費那些轉組過來的訊息,如果轉組次數大於1,則進行嘗試消費
     *
     * @param pendingMessages
     * @return void
     * @author wangke
     * @date 2022/4/20 14:06
     */
    public void consumptionMsg(PendingMessages pendingMessages, String key) {
        if (pendingMessages.size() > 0) {

            pendingMessages.forEach(pendingMessage -> {
                // 最後一次消費到現在的間隔
                Duration elapsedTimeSinceLastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery();

                String groupName = pendingMessage.getGroupName();
                String consumerName = pendingMessage.getConsumerName();
                // 轉組次數
                long totalDeliveryCount = pendingMessage.getTotalDeliveryCount();
                // 只消費轉組次數大於1次的
                if (totalDeliveryCount > 1) {
                    try {
                        RecordId id = pendingMessage.getId();
                        //獲取訊息列表,會自動過濾已經刪除的訊息
                        List<MapRecord<String, String, String>> result = redisStreamService.getMsgList(key, Range.rightOpen(id.toString(), id.toString()));
                        MapRecord<String, String, String> entries = result.get(0);
                        // 消費訊息
                        log.info("獲取到轉組的訊息,消費了該訊息id={}, 訊息value={}, 消費者={}", entries.getId(), entries.getValue(),consumerName);
                        //處理業務
                        this.handleBusiness(key,entries.getValue());
                        // 手動ack訊息
                        redisStreamService.ack(groupName, entries);
                    } catch (Exception e) {
                        // 異常處理
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    /**
     * 描述: 處理業務
     *
     * @param key
     * @param value
     * @return void
     * @author wangke
     * @date   2022/4/20 17:35
     */
    private void handleBusiness(String key, Map<String, String> value) {
        //根據key的不同選擇不同的業務進行處理,同監聽類中的業務處理方法
        switch (key){
            case RedisStreamConstants.MSG_PARSE_STREAM:
                msgParseQueueService.saveMsgData(value);
                redisStreamService.insertStreamAll(RedisStreamConstants.MSG_DATA_STREAM, value);
                break;
            case RedisStreamConstants.MSG_DATA_STREAM:
                msgDataQueueService.sendMsg(value);
                redisStreamService.insertStreamAll(RedisStreamConstants.MSG_RECORD_STREAM, value);
                break;
            case RedisStreamConstants.MSG_RECORD_STREAM:
                msgRecordQueueService.saveMsgRecord(value);
                break;
            default:
                break;

        }

    }

}

3、相關服務

3.1、BaseQueueService
/**
 * @InterfaceName BaseQueueService
 * @Description 佇列基礎服務類
 * @Author wk
 * @DATE 2022/4/21 14:35
 * @Company 杭州震墨科技有限公司
 **/
public interface BaseQueueService {

    /**
     * 描述: 從Redis獲取訊息配置資訊
     *
     * @param key
     * @return com.zm.msg.model.MsgConfig
     * @author wangke
     * @date 2022/4/21 14:38
     */
    MsgConfig getMsgConfigByKey(String key);

    /**
     * 描述: 替換模板中的動態欄位,返回組裝好的訊息內容
     *
     * @param msg 訊息模板
     * @param obj 模板訊息資料實體
     * @return
     * @throws Exception 反射獲取get,set方法失敗
     * @author wangke
     * @date 2022/4/21 14:55
     */
    String buildContent(String msg, Object obj) throws Exception;

}
3.2、MsgParseQueueService
/**
 * @InterfaceName MsgParseQueueService
 * @Description 訊息解析佇列服務類--待解析的訊息
 * @Author wk
 * @DATE 2022/4/21 10:15
 * @Company 杭州震墨科技有限公司
 **/
public interface MsgParseQueueService {

    /**
     * 描述: 新增訊息到parse_stream
     *
     * @param configKey config的key
     * @return void
     * @author wangke
     * @date   2022/4/21 16:19
     */
    void saveMsgData(String configKey);

    /**
     * 描述: 新增訊息到parse_stream
     *
     * @param value
     * @return void
     * @author wangke
     * @date   2022/4/21 16:19
     */
    void saveMsgData(Map<String, String> value);
    
    /**
     * 描述: 解析資料,根據模板生成訊息
     * 
     * @param value
     * @return 解析成功返回true
     * @throws Exception 反射獲取方法失敗
     * @author wangke
     * @date   2022/4/21 13:53
     */
    Boolean parseMsgData(Map<String, String> value) throws Exception;
}
3.3、MsgDataQueueService
/**
 * @InterfaceName MsgDataQueueService
 * @Description 訊息資料佇列服務類--解析後待發送的訊息
 * @Author wk
 * @DATE 2022/4/21 10:13
 * @Company 杭州震墨科技有限公司
 **/
public interface MsgDataQueueService {

    /**
     * 描述: 新增訊息到date_stream
     *
     * @param value
     * @return void
     * @author wangke
     * @date   2022/4/21 16:19
     */
    void sendMsg(Map<String, String> value);

    /**
     * 描述: 新增訊息到date_stream
     *
     * @param model msgConfigKey,msgContent,msgCreateUser,msgSendUser,sendDingDing
     * @return void
     * @throws Exception 建立訊息引數缺失,未配置訊息模板
     * @author wangke
     * @date   2022/4/21 16:19
     */
    void sendMsg(ContentVO model);
    
}
3.4、MsgRecordQueueService
/**
 * @InterfaceName MsgRecordQueueService
 * @Description 訊息記錄佇列服務類--傳送後的訊息
 * @Author wk
 * @DATE 2022/4/21 10:15
 * @Company 杭州震墨科技有限公司
 **/
public interface MsgRecordQueueService {

    /**
     * 描述: 儲存訊息記錄
     *
     * @param value 訊息
     * @return 記錄成功返回true
     * @throws Exception 分離使用者id出錯
     * @author wangke
     * @date 2022/4/22 8:37
     */
    Boolean saveMsgRecord(Map<String, String> value);

    /**
     * 描述: 記錄傳送失敗的訊息
     *
     * @param key stream_key
     * @param recordId stream_id
     * @return void
     * @author wangke
     * @date 2022/4/24 14:38
     */
    void saveErrorMsgRecord(String key, RecordId recordId);
}