1. 程式人生 > >RabbitMQ整合spring

RabbitMQ整合spring

username cep virtual containe 1.7 dconf system xmlns war

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <beans xmlns="http://www.springframework.org/schema/beans"
  3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4     xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
5 xmlns:jee="http://www.springframework.org/schema/jee" xmlns:aop="http://www.springframework.org/schema/aop" 6 xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 8 xsi:schemaLocation
="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans.xsd 10 http://www.springframework.org/schema/context 11 http://www.springframework.org/schema/context/spring-context-4.3.xsd 12 http://www.springframework.org/schema/mvc 13 http://www.springframework.org/schema/mvc/spring-mvc.xsd
14 http://www.springframework.org/schema/tx 15 http://www.springframework.org/schema/tx/spring-tx.xsd 16 http://www.springframework.org/schema/aop 17 http://www.springframework.org/schema/aop/spring-aop.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task.xsd 20 http://www.springframework.org/schema/rabbit 21 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd"> 22 23 <description>rabbitmq 連接服務配置</description> 24 <!-- 不適用【發布確認】連接配置 --> 25 <rabbit:connection-factory id="rabbitConnectionFactory" 26 host="172.18.112.102" username="woms" password="woms" port="5672" 27 virtual-host="lingyi" channel-cache-size="25" cache-mode="CHANNEL" publisher-confirms="true" publisher-returns="true" connection-timeout="200"/> 28 29 30 31 <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 32 <property name="backOffPolicy"> 33 <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> 34 <property name="initialInterval" value="200" /> 35 <property name="maxInterval" value="30000" /> 36 </bean> 37 </property> 38 <property name="retryPolicy"> 39 <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> 40 <property name="maxAttempts" value="5"/> 41 </bean> 42 </property> 43 </bean> 44 45 46 47 <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 如果使用多exchange必須配置declared-by="connectAdmin" --> 48 <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/> 49 50 <rabbit:template id="ampqTemplate" connection-factory="connectionFactory" 51 exchange="test-mq-exchange" return-callback="sendReturnCallback" 52 message-converter="jsonMessageConverter" routing-key="test_queue_key" 53 mandatory="true" confirm-callback="confirmCallback" retry-template="retryTemplate"/> 54 55 56 <bean id="confirmCallback" class="ly.net.rabbitmq.MsgSendConfirmCallBack" /> 57 <bean id="sendReturnCallback" class="ly.net.rabbitmq.MsgSendReturnCallback" /> 58 <!-- 消息對象json轉換類 --> 59 <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 <!-- queue配置 --> 75 <!-- durable:是否持久化 --> 76 <!-- exclusive: 僅創建者可以使用的私有隊列,斷開後自動刪除 --> 77 <!-- auto_delete: 當所有消費客戶端連接斷開後,是否自動刪除隊列 --> 78 <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" declared-by="rabbitAdmin" /> 79 80 81 82 83 <!-- exchange配置 --> 84 <!-- rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉發。 --> 85 <!-- rabbit:binding:設置消息queue匹配的key --> 86 <rabbit:direct-exchange name="test-mq-exchange" 87 durable="true" auto-delete="false" id="test-mq-exchange" declared-by="rabbitAdmin"> 88 <rabbit:bindings> 89 <rabbit:binding queue="test_queue_key" key="test_queue_key" /> 90 </rabbit:bindings> 91 </rabbit:direct-exchange> 92 93 <!-- <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> --> 94 <!-- <rabbit:bindings> --> 95 <!-- 設置消息Queue匹配的pattern (direct模式為key) --> 96 <!-- <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/> --> 97 <!-- </rabbit:bindings> --> 98 <!-- </rabbit:topic-exchange> --> 99 100 101 <bean id="mqConsumer" class="ly.net.rabbitmq.MQConsumer" /> 102 <bean id="mqConsumer1" class="ly.net.rabbitmq.MQConsumerManual" /> 103 104 <!-- listener配置 消費者 自動確認 --> 105 <!-- queues:監聽的隊列,多個的話用逗號(,)分隔 ref:監聽器 --> 106 <rabbit:listener-container 107 connection-factory="connectionFactory" acknowledge="auto" 108 message-converter="jsonMessageConverter"> 109 <rabbit:listener queues="test_queue_key" ref="mqConsumer" /> 110 </rabbit:listener-container> 111 <!-- 消費者 手動確認 --> 112 <rabbit:listener-container 113 connection-factory="connectionFactory" acknowledge="manual"> 114 <rabbit:listener queues="test_queue_key" ref="mqConsumer1" /> 115 </rabbit:listener-container> 116 117 118 119 120 121 122 123 </beans>

 1 package ly.net.rabbitmq;
 2 
 3 
 4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 5 import org.springframework.amqp.rabbit.support.CorrelationData;
 6 public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
 7 
 8     @Override
 9     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
10         // TODO Auto-generated method stub
11         if (ack) {  
12             System.out.println("消息確認成功");  
13         } else {  
14             //處理丟失的消息  
15             System.out.println("消息確認失敗,"+cause);  
16         } 
17     }  
18     
19 } 
package ly.net.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Autowired;

public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback{
    @Autowired
    private RabbitTemplate errorTemplate;
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String msgJson  = new String(message.getBody());  
        System.out.println("Returned Message:"+msgJson); 
        
        //重新發布
//        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey");
//        Throwable cause = new Exception(new Exception("route_fail_and_republish"));
//        recoverer.recover(message,cause);
//        System.out.println("Returned Message:"+replyText);
//        
    }

}
 1 package ly.net.rabbitmq;
 2 
 3 import org.springframework.amqp.core.Message;
 4 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 5 
 6 import com.rabbitmq.client.Channel;
 7 
 8 public class MQConsumerManual implements ChannelAwareMessageListener {
 9 
10     @Override
11     public void onMessage(Message message, Channel channel) throws Exception {
12         // TODO Auto-generated method stub
13         //手動確認
14         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
15     }
16 
17 }
@Service
public class MQProducerImpl implements MQProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger LOGGER = Logger.getLogger(MQProducerImpl.class);
   /*
    * convertAndSend:將Java對象轉換為消息發送到匹配Key的交換機中Exchange,由於配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
    * 原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    */
    @Override
    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(object);
            
        } catch (Exception e) {
            LOGGER.error(e);
        }

    }
}
public interface MQProducer {
    /**
     * 發送消息到指定隊列
     * @param queueKey
     * @param object
     */
    public void sendDataToQueue(String queueKey, Object object);
}

RabbitMQ整合spring