1. 程式人生 > >redis 訊息釋出訂閱與訊息佇列

redis 訊息釋出訂閱與訊息佇列

redis可以實現訊息的釋出訂閱,可以用作java中的訂閱釋出模式

純粹redis的釋出訂閱 
redis客戶端1中使用命令 SUBSCRIBE talk 可以訂閱通道 talk上的訊息 
redis客戶端2中也同樣執行這個命令一起訂閱通道 talk 
redis客戶端3使用命令 PUBLISH talk 'test' 可以發現客戶端1和2同時受到訊息

java實現 
可以通過spring-redis中的redisTemplate工具輔助實現 
1.釋出訊息

/**
 * redis釋出訊息
 *
 * @param channel
 * @param message
 */
public void sendMessage(String channel, String message) {
    redisTemplate.convertAndSend(channel, message);
}

直接使用convertAndSend方法即可向指定的通道釋出訊息

2.監聽訊息 
監聽訊息需要兩步,訊息監聽類並在xml中註冊這個類 
監聽類有兩種實現方式一種是實現org.springframework.data.redis.connection.MessageListener介面,實現onMessage方法示例程式碼如下:
 

@Component
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static Logger logger = Logger.getLogger(RedisMessageListener.class);

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();// 請使用valueSerializer
        byte[] channel = message.getChannel();
        // 請參考配置檔案,本例中key,value的序列化方式均為string。
        // 其中key必須為stringSerializer。和redisTemplate.convertAndSend對應
        String msgContent = (String) redisTemplate.getValueSerializer().deserialize(body);
        String topic = (String) redisTemplate.getStringSerializer().deserialize(channel);
        logger.info("redis--topic:" + topic + "  body:" + msgContent);
    }
}

也可以使用自己定義的類,方法名稱自己定義,示例如下:

@Component
public class EventListener {

    private static Logger logger = Logger.getLogger(EventListener.class);

    public void getMessage(String message, String channel) {
        logger.info(message);
    }
} 

這兩中方式實現的不同在於註冊監聽器時的配置略有不同 
redis配置檔案:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p" xmlns:redis="http://www.springframework.org/schema/redis"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd                           http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">

<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxIdle" value="${redis.maxIdle}" />
    <!-- <property name="maxActive" value="${redis.maxActive}" /> -->
    <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
    <property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>
<!--注意使用訂閱釋出時,此bean必須命名為redisCOnnectionFactory,否則需要在listener中指明連線工廠-->
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
      p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}"
      p:pool-config-ref="poolConfig"/>

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

<bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
<!--此處註冊監聽器,需要指定通道名稱(topic)(可以使用正則表示式*_等等),第一種為實現MessageListener介面的監聽器的註冊,第二種為自己定義的類的註冊需要制定處理方法名稱(不制定的預設方法為handleMessage,如果你的方法是這個名稱可以不指定)與序列化的方式,推薦使用第一種方式-->
<redis:listener-container>
    <redis:listener ref="redisMessageListener" topic="talk"/>
    <redis:listener ref="eventListener" topic="talk*" method="getMessage"
                    serializer="stringRedisSerializer"></redis:listener>
</redis:listener-container>
</beans>

redis訊息佇列 
redis訊息佇列使用 redis中的list資料結構實現(左進右出)

/**
 * 向指定的列表左邊插入資料
 *
 * @param key
 * @param value
 * @return
 */
public void leftPush(String key, String value) {
    redisTemplate.opsForList().leftPush(key, value);
}

這個程式碼即可向指定的list中的左邊插入值

/**
 * 彈出指定列表右邊的資料(如果沒有資料,在指定的時間內等待)
 *
 * @param key
 * @param timeout
 * @param unit
 * @return
 */
public String rightPop(String key, long timeout, TimeUnit unit) {
    return redisTemplate.opsForList().rightPop(key, timeout, unit);
}

以上程式碼可以從指定列表的右邊彈出一個數據(如果沒有,會等待指定時間返回空),只需要在工程中啟動一個執行緒不停的使用這個方法即可實現訊息佇列的監聽

@PostConstruct
public void messageListener() {
    new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                rightPop(....)
                ....
            }
        }
    }, "訊息監聽任務執行緒").start();
}

對於彈出方法,可以使用

/**
 * 彈出指定列表右邊,並向指定列表的左邊插入(彈出列表如果沒有元素,等待指定的時間)
 *
 * @param sourceKey
 * @param destinationKey
 * @param timeout
 * @param unit
 * @return
 */
public String rightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) {
    return redisTemplate.opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit);
    }

這個方法優化程式碼,實現彈出的同時插入一個處理佇列(事務)

 

--------------------- 
作者:玩家六 
來源:CSDN 
原文:https://blog.csdn.net/jslcylcy/article/details/78201812 
版權宣告:本文為博主原創文章,轉載請附上博文連結!