1. 程式人生 > >redis之mq實現釋出訂閱模式

redis之mq實現釋出訂閱模式

  • 示例程式碼-github

概述

Redis不僅可作為快取伺服器,還可用作訊息佇列,本示例演示如何使用redis實現釋出/訂閱訊息佇列。

  • 在Redis中,釋出者沒有將訊息傳送給特定訂閱者的程式。相反,釋出的訊息被描述為通道,而不知道(如果有的話)可能有哪些訂閱者。
  • 訂閱者表示對一個或多個主題感興趣,只接收感興趣的訊息,而不知道(如果有的話)釋出者是什麼。
  • 釋出者和訂閱者的這種解耦可以實現更大的可伸縮性和更動態的網路拓撲。

程式碼實現

redis實現mq的儲存方式很多,可以使用list,zset及stream,這些資料的儲存結構決定了怎麼消費問題(訊息是一次使用、允許多次使用、允許多端訊息等),比如使用list,我們可以使用leftPush插入訊息,使用rightPop消費訊息,實現一條訊息一次訊息,可以參考與以示例程式碼:

    @Test
    public void testMq() {
        for (int i = 0; i < 10; i++) {
            redisTemplate.opsForList().leftPush("task-queue", "data" + i);
            log.info("插入了一個新的任務==>{}", "data" + i);
        }
        String taskId = redisTemplate.opsForList().rightPop("task-queue").toString();
        log.info("處理成功,清除任務==>{}", taskId);
    }

1.配置程式碼RedisConfig.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * redisTemplate 序列化使用的jdkSerializeable, 儲存二進位制位元組碼, 所以自定義序列化類,方便除錯redis
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();

        //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

        //使用StringRedisSerializer來序列化和反序列化redis的ke
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());

        //開啟事務
        redisTemplate.setEnableTransactionSupport(true);

        redisTemplate.setConnectionFactory(redisConnectionFactory);

        return redisTemplate;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, topic());

        return container;
    }

    @Bean
    MessagePublisher redisPublisher() {
        return new RedisMessagePublisher(redisTemplate, topic());
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic("messageQueue");
    }
}

2.定義訊息釋出介面MessagePublisher.java

package demo.data.mqRedis.config;

public interface MessagePublisher {
    void publish(String message);
}

3.釋出方實現RedisMessagePublisher.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

/**
 * 訊息釋出方
 */
public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher(
            RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
        this.redisTemplate = redisTemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

4.訊息接收方RedisMessageSubscriber.java

package demo.data.mqRedis.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

/**
 * 訊息訂閱方
 */
@Service
@Slf4j
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<>();

    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString());
        log.info("訂閱方接收到了訊息==>{}", message.toString());
    }
}

5.最後貼上application.yml配置

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:

檢視執行結果

1.編寫測試用例試釋出訊息TestRedisMQ.java

package demo.data.mqRedis;

import demo.data.mqRedis.config.RedisMessagePublisher;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestRedisMQ {

    @Autowired
    RedisMessagePublisher redisMessagePublisher;

    @Test
    public void testMq() {
        String message = "Message " + UUID.randomUUID();
        redisMessagePublisher.publish(message);
    }
}

2.執行結果

2019-09-05 15:51:33.931  INFO 10772 --- [    container-2] d.d.m.config.RedisMessageSubscriber      : 訂閱方接收到了訊息==>"Message c95959bf-6c30-4801-bc80-0e1e3c9f81bc"

訂閱方成功接收到訊息了

資料

  • 示例程式碼-github
  • redis實現mq的方案以及stream的應用
  • 釋出-訂閱模式