1. 程式人生 > >基於spring-redis釋出訂閱模式的實現

基於spring-redis釋出訂閱模式的實現

redis配置: 
Java程式碼 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:redis="http://www.springframework.org/schema/redis" xmlns:p="http://www.springframework.org/schema/p"
      
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans  
  6.        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  7.        http://www.springframework.org/schema/context  
  8.        http://www.springframework.org/schema/context/spring-context-3.0.xsd  
  9.        http://www.springframework.org/schema/redis
      
  10.         http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"  
  11.     default-autowire="byName">  
  12.     <context:property-placeholder location="classpath:redis.properties" />  
  13.     <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">  
  14.         <property name="maxIdle"
     value="${redis.maxIdle}" />  
  15.         <property name="maxTotal" value="${redis.maxTotal}" />  
  16.         <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />  
  17.         <property name="testOnBorrow" value="${redis.testOnBorrow}" />  
  18.     </bean>  
  19.     <bean id="jedisConnectionFactory"  
  20.         class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"  
  21.         destroy-method="destroy">  
  22.         <property name="poolConfig" ref="jedisPoolConfig"></property>  
  23.         <property name="hostName" value="${redis.host}"></property>  
  24.         <property name="port" value="${redis.port}"></property>  
  25.         <property name="password" value="${redis.pass}"></property>  
  26.     </bean>  
  27.     <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">  
  28.         <property name="connectionFactory" ref="jedisConnectionFactory"></property>  
  29.         <property name="defaultSerializer">  
  30.             <bean  
  31.                 class="org.springframework.data.redis.serializer.StringRedisSerializer" />  
  32.         </property>  
  33.     </bean>  
  34.     <bean id="registerMessageListener" class="com.gc.biz.cache.listener.RegisterMessageListener">  
  35.         <property name="redisTemplate" ref="redisTemplate"></property>  
  36.     </bean>  
  37.     <bean id="priDocMessageListener" class="com.gc.biz.cache.listener.PriDocRegActMsgListener">  
  38.         <property name="redisTemplate" ref="redisTemplate"></property>  
  39.     </bean>  
  40.     <bean id="redisDAO" class="com.gc.biz.cache.impl.MessageDaoImpl">  
  41.         <property name="redisTemplate" ref="redisTemplate" />  
  42.     </bean>  
  43.     <bean id="topicContainer"  
  44.         class="org.springframework.data.redis.listener.RedisMessageListenerContainer"  
  45.         destroy-method="destroy">  
  46.         <property name="connectionFactory" ref="jedisConnectionFactory" />  
  47.         <property name="taskExecutor">  
  48.             <bean  
  49.                 class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">  
  50.                 <property name="poolSize" value="3"></property>  
  51.             </bean>  
  52.         </property>  
  53.         <property name="messageListeners">  
  54.             <map>  
  55.                 <entry key-ref="registerMessageListener">  
  56.                     <bean class="org.springframework.data.redis.listener.ChannelTopic">  
  57.                         <constructor-arg value="coupon|redenvelop|notify|points" />  
  58.                     </bean>  
  59.                 </entry>  
  60.                 <entry key-ref="priDocMessageListener">  
  61.                     <bean class="org.springframework.data.redis.listener.ChannelTopic">  
  62.                         <constructor-arg value="YZM|BG" />  
  63.                     </bean>  
  64.                 </entry>  
  65.             </map>  
  66.         </property>  
  67.     </bean>  
  68.     <bean id="springContext" class="com.gc.biz.cache.util.SpringContextHolder" />  
  69.     <bean id="doctorDAO" class="com.gc.biz.cache.impl.DoctorDAOImpl" >  
  70.         <property name="redisTemplate" ref="redisTemplate" />  
  71.     </bean>  
  72.     <bean id="remindDAO" class="com.gc.biz.cache.impl.RemindDAOImpl" />  
  73.     <bean id="userDAO" class="com.gc.biz.cache.impl.UserDAOImpl" />  
  74.     <bean id="userDataDAO" class="com.gc.biz.cache.impl.UserDataDAOImpl" />  
  75. </beans>    


監聽器的實現: 
Java程式碼 
  1. package com.gc.biz.cache.listener;  
  2. import java.io.Serializable;  
  3. import java.util.HashMap;  
  4. import java.util.Map;  
  5. import org.apache.log4j.Logger;  
  6. import org.springframework.data.redis.connection.Message;  
  7. import org.springframework.data.redis.connection.MessageListener;  
  8. import org.springframework.data.redis.core.RedisTemplate;  
  9. import com.gc.apps.jsk.coupon.service.CouponService;  
  10. import com.gc.apps.jsk.coupon.service.impl.CouponServiceImpl;  
  11. import com.gc.apps.jsk.invitationcode.service.InvitationService;  
  12. import com.gc.apps.jsk.invitationcode.service.impl.InvitationServiceImpl;  
  13. import com.gc.apps.jsk.login.service.RegisterService;  
  14. import com.gc.apps.jsk.login.service.impl.RegisterServiceImpl;  
  15. import com.gc.apps.jsk.membership.service.MemberShipService;  
  16. import com.gc.apps.jsk.membership.service.impl.MemberShipServiceImpl;  
  17. import com.gc.biz.member.dbobj.MemberInfo;  
  18. import com.gc.common.util.StrUtil;  
  19. import com.gc.frame.core.db.DBTransaction;  
  20. import com.gc.frame.core.misc.StringUtil;  
  21. import com.google.gson.Gson;  
  22. public class RegisterMessageListener implements MessageListener {  
  23.     private RedisTemplate<Serializable, Serializable> redisTemplate;  
  24.     private static Logger logger = Logger.getLogger(RegisterMessageListener.class);  
  25.     public void setRedisTemplate(RedisTemplate<Serializable, Serializable> redisTemplate) {  
  26.         this.redisTemplate = redisTemplate;  
  27.     }  
  28.     @Override  
  29.     public void onMessage(Message message, byte[] pattern) {  
  30.         byte[] body = message.getBody();// 請使用valueSerializer  
  31.         byte[] channel = message.getChannel();  
  32.         // 請參考配置檔案,本例中key,value的序列化方式均為string。  
  33.         // 其中key必須為stringSerializer。和redisTemplate.convertAndSend對應  
  34.         String msgContent = (String) redisTemplate.getValueSerializer().deserialize(body);  
  35.         String topic = (String) redisTemplate.getStringSerializer().deserialize(channel);  
  36.         System.out.println(topic + ":" + msgContent);  
  37.         Map<String, String> map = new Gson().fromJson(msgContent, Map.class);  
  38.         String from = map.get("from");  
  39.         if ("wx".equals(from)) {  
  40.             doRegisterMsg_wx(topic, msgContent);  
  41.         } else if ("app".equals(from)) {  
  42.             doRegisterMsg(topic, msgContent);  
  43.         }  
  44.     }  


訊息傳送介面的實現: 
Java程式碼 
  1. package com.gc.biz.cache.impl;  
  2. import java.io.Serializable;  
  3. import org.springframework.data.redis.core.RedisTemplate;  
  4. import com.gc.biz.cache.dao.MessageDao;  
  5. public class MessageDaoImpl implements MessageDao{  
  6.     private RedisTemplate<String , Object> redisTemplate = null;  
  7.     public MessageDaoImpl() {  
  8.     }  
  9.     @Override  
  10.     public void sendMessage(String channel, Serializable message) {  
  11.         redisTemplate.convertAndSend(channel, message);  
  12.     }  
  13.     public RedisTemplate<String, Object> getRedisTemplate() {  
  14.         return redisTemplate;  
  15.     }  
  16.     public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {  
  17.         this.redisTemplate = redisTemplate;  
  18.     }  
  19. }  


測試呼叫的方法: 
Java程式碼 
  1. MessageDao dao = SpringContextHolder.getBean("redisDAO");  
  2. Map<String,String> map = new HashMap<String,String>();  
  3. map.put("1""11111");  
  4. map.put("2""22222");  
  5. dao.sendMessage("coupon"new Gson().toJson(map));  
  6. dao.sendMessage("redenvelop"new Gson().toJson(map));  
  7. dao.sendMessage("notify"new Gson().toJson(map));  
  8. map.put("UserBagID""1");  
  9. map.put("CreateDate""2016-06-01 16:51:35");  
  10. dao.sendMessage("iphone|xiaomi"new Gson().toJson(map));  


注意:1、如果有多個專案同時使用此配置,只需要保留一個專案配置檔案有關注專案;2、此配置沒有考慮分散式部署的環境,如果要考慮從redis list和分散式鎖的方向考慮。