1. 程式人生 > 其它 >RedisTemplate實現釋出訂閱

RedisTemplate實現釋出訂閱

技術標籤:NoSql

1 背景

一個專案有三個模組,閘道器,使用者管理,業務主體;客戶的環境安裝中介軟體很麻煩,而且專案很小(沒必要部署),實現的需要是閘道器介面攔截token失效插入一個審計日誌到業務主體專案中,由於考慮到閘道器的純潔性,所以使用feign呼叫的方式直接pass掉了,使用訊息中介軟體也是一個好辦法(但是專案安裝比較麻煩因為你是政府專案),所以就想到了用redis的釋出訂閱的模式。

2.1訂閱配置

頻道常量

public class MessageConstant {

    /**
     * 訂閱的頻道
     */
    public static final String AUDIT_LOG_CHANNEL = "AUDIT_LOG_CHANNEL";
}

redis配置類

import net.cc.support.MessageReceiverSupport;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @author yueyang
 * @since 2021/02/05 14:47
 **/
@Configuration
@AutoConfigureAfter({MessageReceiverSupport.class})
public class RedisPubSubConfig {


   /**
     * Redis訊息監聽器容器
     * 這個容器載入了RedisConnectionFactory和訊息監聽器
     * 可以新增多個監聽不同話題的redis監聽器,只需要把訊息監聽器和相應的訊息訂閱處理器繫結,該訊息監聽器
     * 通過反射技術呼叫訊息訂閱處理器的相關方法進行一些業務處理
     * @param connectionFactory 連結工廠
    * @param adapter 介面卡
     * @return redis訊息監聽容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
        MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //可以新增多個 messageListener
        container.addMessageListener(adapter, new PatternTopic(MessageConstant.AUDIT_LOG_CHANNEL));
        return container;
    }


    /**
     * 訊息監聽器介面卡,繫結訊息處理器,利用反射技術呼叫訊息處理器的業務方法
     * 將MessageReceiver註冊為一個訊息監聽器,可以自定義訊息接收的方法(handleMessage)
     * 如果不指定訊息接收的方法,訊息監聽器會預設的尋找MessageReceiver中的onMessage這個方法作為訊息接收的方法
     * @param messageReceiver 訊息接受
     * @return 介面卡
     */
    @Bean
    public MessageListenerAdapter adapter(MessageReceiverSupport messageReceiver) {
        return new MessageListenerAdapter(messageReceiver, "onMessage");
    }

}

訊息監聽程式碼

/**
 * @author yueyang
 * @since 2021/02/05 14:48
 **/
@Component
@Slf4j
public class MessageReceiverSupport implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;
    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> redisSerializer = redisTemplate.getStringSerializer();
        String msg= redisSerializer.deserialize(message.getBody());
        System.out.println("接收到的訊息是:"+ msg);
        log.info("Received <" + msg + ">");

    }

}

2.2釋出程式碼

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

/**
 * @author yueyang
 * @since 2021/02/05 15:46
 **/
@Service
public class RedisService {

    @Autowired
    RedisTemplate<String,String> redisTemplate;

    public String sendMessage (String msg) {
        try {
            redisTemplate.convertAndSend(MessageConstant.AUDIT_LOG_CHANNEL, msg);
            System.out.println(msg);
            return "訊息傳送成功";
        }catch (Exception e) {
            e.printStackTrace();
            return "訊息傳送失敗";
        }
    }
}

2.3 nacos配置

spring:
  redis:
    host: 1.11.1.1
    port: 6379
    password: xxxx
    lettuce:
      pool:
        max-active: 1000 #最大連線數(使用負值表示沒有限制)
        max-idle: 10 #最大空閒連線
        min-idle: 5 #最新空閒連線
        max-wait: -1 # 最大阻塞等待時間(使用負值表示沒有限制)

3 問題

第一個原因是和redis系統的穩定性有關。對於舊版的redis來說,如果一個客戶端訂閱了某個或者某些頻道,但是它讀取訊息的速度不夠快,那麼不斷的積壓的訊息就會使得redis輸出緩衝區的體積越來越大,這可能會導致redis的速度變慢,甚至直接崩潰。也可能會導致redis被作業系統強制殺死,甚至導致作業系統本身不可用。新版的redis不會出現這種問題,因為它會自動斷開不符合client-output-buffer-limit pubsub配置選項要求的訂閱客戶端

第二個原因是和資料傳輸的可靠性有關。任何網路系統在執行操作時都可能會遇到斷網的情況。而斷線產生的連線錯誤通常會使得網路連線兩端中的一端進行重新連線。如果客戶端在執行訂閱操作的過程中斷線,那麼客戶端將會丟失在斷線期間的訊息,這在很多業務場景下是不可忍受的。

4 原理

釋出訂閱模式:就是一個釋出者釋出訊息,多個訂閱者進行訊息的訂閱,目的是為了訊息的傳送。

4.1 釋出訂閱模式的結構

主要包含三個部分:釋出者,訂閱者和Channel。


這裡寫圖片描述

結合上圖和訊息中介軟體,可以將channel和訊息中介軟體中的topic主題對應起來

釋出訂閱中使用到的命令就只有三個:PUBLISH,SUBSCRIBE,PSUBSCRIBE

  • PUBLISH 用於釋出訊息
  • SUBSCRIBE 也叫頻道訂閱,用於訂閱某一特定的頻道
  • PSUBSCRIBE 也叫模式訂閱,用於訂閱某一組頻道,使用glob的方式,比如xxx-*可以匹配xxx-a,和xxx-b,xxx-ddd等等

4.2 原理

Redis 訂閱與釋出原理

client->pubsub_channels 是客戶端維護的一個以dict結構的維護的訂閱頻道雜湊表,VAL是NULL,不需要值。

server->pubsub_channels 是服務端維護的一個以dict結構的維護的訂閱頻道雜湊表,VAL是以client維護的雙向連結串列adlist。

一、訂閱

訂閱流程:SUBSCRIBE命令 SUBSCRIBE channel [channel ...]

1.首先是將當前訂閱的頻道channel新增進客戶端的pubsub_channels雜湊表裡面。

2.然後在將當前訂閱的頻道channel和對應的client以鍵值對新增服務端的pubsub_channels的雜湊表裡。

3.最後將返回的資訊返回給客戶端。

二、釋出

訂閱流程:PUBLISH命令 PUBLISH channel message

1.首先通過頻道在服務端的pubsub_channels雜湊表裡面找到對應的客戶端連結串列。

2.然後遞迴迴圈連結串列,逐個將訊息message傳送對應訂閱的客戶端。

3.正則匹配的頻道 逐個傳送對應訂閱的客戶端。

參考連結:https://blog.csdn.net/luolaifa000/article/details/84110633