1. 程式人生 > >Spring MVC + redis 訊息的訂閱釋出

Spring MVC + redis 訊息的訂閱釋出

釋出訂閱(pub/sub)是一種訊息通訊模式,主要的目的是解耦訊息釋出者和訊息訂閱者之間的耦合,這點和設計模式中的觀察者模式比較相似。pub /sub不僅僅解決釋出者和訂閱者直接程式碼級別耦合也解決兩者在物理部署上的耦合。redis作為一個pub/sub server,在訂閱者和釋出者之間起到了訊息路由的功能。訂閱者可以通過subscribe和psubscribe命令向redis server訂閱自己感興趣的訊息型別,redis將訊息型別稱為通道(channel)。當釋出者通過publish命令向redis server傳送特定型別的訊息時。訂閱該訊息型別的全部client都會收到此訊息。這裡訊息的傳遞是多對多的。一個client可以訂閱多個 channel,也可以向多個channel傳送訊息。

初次使用redis的訊息訂閱釋出,在網上搜了一圈發現都是比較笨的在配置檔案配置訊息的形式,自己折騰著換了種方法實現,貼出來自己記錄一下。

先在maven引入對應的jar,注意版本問題,版本不對應會出很多奇奇怪怪的問題。

<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-redis</artifactId>
	<version>1.6.2.RELEASE</version>
</dependency>
<dependency>
	<groupId>redis.clients</groupId>
	<artifactId>jedis</artifactId>
	<version>2.8.2</version>
</dependency>
然後是redis的配置檔案redis-context.xml。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:component-scan base-package="com.zavfwei.redis" />

    <!-- scanner redis properties  -->
    <context:property-placeholder location="classpath:config/redis.properties" file-encoding="utf-8" ignore-unresolvable="true"/>

       <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
              <property name="maxIdle" value="${cache.redis.maxIdle}" />
              <property name="maxTotal" value="${cache.redis.maxTotal}" />
              <property name="testOnBorrow" value="${cache.redis.testOnBorrow}" />
       </bean>

       <bean id="redisConnectionFactory"
             class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
              <property name="usePool" value="true"></property>
              <property name="hostName" value="${cache.redis.host}" />
              <property name="port" value="${cache.redis.port}" />
              <property name="password" value="${cache.redis.password}" />
              <property name="timeout" value="${cache.redis.timeout}" />
              <property name="database" value="${cache.redis.db}"></property>
              <constructor-arg index="0" ref="jedisPoolConfig" />
       </bean>

       <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
              <property name="connectionFactory" ref="redisConnectionFactory" />
              <property name="keySerializer" ref="stringRedisSerializer" />
              <property name="valueSerializer" ref="stringRedisSerializer" />
              <property name="hashKeySerializer" ref="stringRedisSerializer" />
              <property name="hashValueSerializer" ref="stringRedisSerializer" />
       </bean>

    <bean id="stringRedisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
        <property name="connectionFactory" ref="redisConnectionFactory" />
        <property name="keySerializer" ref="stringRedisSerializer" />
        <property name="valueSerializer" ref="stringRedisSerializer" />
        <property name="hashKeySerializer" ref="stringRedisSerializer" />
        <property name="hashValueSerializer" ref="stringRedisSerializer" />
    </bean>

       <bean id="stringRedisSerializer"
             class="org.springframework.data.redis.serializer.StringRedisSerializer" >
       </bean>
</beans>

訊息訂閱釋出的一個工具類RedisCacheDao。

package com.zavfwei.redis.core;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import javax.annotation.Resource;

/**
 * Created by winton on 2017/3/21 0021.
 */
@Service
public class RedisCacheDao<T> {

    @Resource(name = "redisTemplate")
    public RedisTemplate redisTemplate;

    /**
     * 釋出訊息到指定的頻道
     *
     * @param channel
     * @param message
     */
    public void publish(final String channel, final String message) {
        redisTemplate.execute(new RedisCallback<Object>() {
            public Object doInRedis(final RedisConnection connection)
                    throws DataAccessException {
                ((Jedis) connection.getNativeConnection()).publish(channel, message);
                return null;
            }
        });
    }

    /**
     * 訂閱給定的一個或多個頻道的資訊
     *
     * @param jedisPubSub
     * @param channels
     */
    public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
        redisTemplate.execute(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                ((Jedis) connection.getNativeConnection()).subscribe(jedisPubSub, channels);
                return null;
            }
        });
    }
}

測試訊息的訂閱釋出類。
package com.zavfwei.service.other;

import com.zavfwei.RedisKey;
import com.zavfwei.redis.core.RedisCacheDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import redis.clients.jedis.JedisPubSub;

import javax.annotation.PostConstruct;

/**
 * Created by winton on 2017/3/15 0015.
 */
@Service
public class WithdrawService {

    @Autowired
    private RedisCacheDao redisCacheDao;

    private Logger log = LoggerFactory.getLogger(this.getClass());

    @PostConstruct
    public void init(){
        new Thread(){
            @Override
            public void run() {
                redisCacheDao.subscribe(new JedisPubSub() {
                    @Override
                    public void onMessage(String channel, String message) {
                        log.debug("redis通知,channel={},message={}",channel,message);
                        if(channel.equals(RedisKey.TXAPPLY_CHANNEL)){

                        }
                    }
                },RedisKey.TXAPPLY_CHANNEL);
            }
        }.start();
    }

    public void publish(){
        redisCacheDao.publish(RedisKey.TXAPPLY_CHANNEL,"massage");
    }
}