1. 程式人生 > >rabbitmq 整合 spring mvc

rabbitmq 整合 spring mvc

引入包

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId
>
<artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version
>
4.1.6.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency
>

rabbitmq.properties

## rabbitmq 基礎引數配置  ##
username=guest
password=guest
host=192.168.74.167
port=5672
virtual_host=/

rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
       xsi:schemaLocation="http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd  
     http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd  
     http://www.springframework.org/schema/rabbit  
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
     "> 

     <!-- 執行緒池配置 -->
     <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">            
        <!-- 核心執行緒數,預設為1 -->            
        <property name="corePoolSize" value="10" />            
        <!-- 最大執行緒數,預設為Integer.MAX_VALUE -->            
        <property name="maxPoolSize" value="50" />            
        <!-- 佇列最大長度,一般需要設定值>=notifyScheduledMainExecutor.maxNum;預設為Integer.MAX_VALUE -->            
        <property name="queueCapacity" value="3000" />            
        <!-- 執行緒池維護執行緒所允許的空閒時間,預設為60s -->            
        <property name="keepAliveSeconds" value="300" />            
        <!-- 執行緒池對拒絕任務(無執行緒可用)的處理策略,目前只支援AbortPolicy、CallerRunsPolicy;預設為後者 -->            
        <property name="rejectedExecutionHandler">            
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />            
        </property>            
    </bean>


    <!-- 建立connectionFactory -->  
    <rabbit:connection-factory id="connectionFactory" host="${host}" username="${username}"  
        password="${password}" port="${port}" virtual-host="${virtual_host}"/> 

     <!-- 通過指定下面的admin資訊,當前productor中的exchange和queue會在rabbitmq伺服器上自動生成 -->
     <rabbit:admin connection-factory="connectionFactory"/>

     <!-- 定義rabbit template 用於資料的接收和傳送 -->
     <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />

     <!-- 廣播 -->
     <rabbit:fanout-exchange name="logs" durable="false" auto-delete="false">
         <rabbit:bindings>
            <rabbit:binding queue="logs_1"></rabbit:binding>
            <rabbit:binding queue="logs_2"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:fanout-exchange>
     <!-- 列隊 --> 

    <rabbit:queue name="logs_1" durable="false" auto-delete="false" exclusive="false">
    </rabbit:queue>
    <rabbit:queue name="logs_2" durable="false" auto-delete="false" exclusive="false">
    </rabbit:queue>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="task_queue" ref="taskListenter"/>
        <rabbit:listener queues="logs_1" ref="fanoutListenter" response-exchange="logs"/> 
        <rabbit:listener queues="logs_2" ref="fanoutListenter" response-exchange="logs"/> 
    </rabbit:listener-container>
</beans>     


applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-4.1.xsd   
          http://www.springframework.org/schema/context   
          http://www.springframework.org/schema/context/spring-context-4.1.xsd   
          http://www.springframework.org/schema/aop   
          http://www.springframework.org/schema/aop/spring-aop-4.1.xsd   
          http://www.springframework.org/schema/tx    
          http://www.springframework.org/schema/tx/spring-tx-4.1.xsd">


    <context:component-scan base-package="com.hhly.*" />

    <!-- 載入rabbitmq -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath*:rabbitmq.properties</value>
            </list>
        </property>
    </bean>

    <!-- 引入配置檔案 -->
    <import resource="rabbitmq.xml"/>

 </beans>

生產

package com.hhly.rabbitmq.spring.produce;

public interface MQProducer {
    /**
     * 傳送訊息到指定佇列
     * @param queueKey
     * @param object
     */
    public void sendDataToQueue(String queueKey, String message);
    /**
     * 傳送廣播資訊
     * @author jiangwei
     * @Version 1.0
     * @CreatDate 2017年3月27日 下午2:17:38
     * @param exchange
     * @param object
     */
    public void sendDataToFanout(String exchange,String message);
}
package com.hhly.rabbitmq.spring.produce;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQProducerImpl implements MQProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendDataToQueue(String queueKey, String message) {
        byte [] body= message.getBytes();
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        //properties.setPriority(5);
        Message message2 = new Message(body,properties );
        amqpTemplate.send(queueKey,message2);
    }

    @Override
    public void sendDataToFanout(String exchange, String message) {
        amqpTemplate.convertAndSend(exchange, "", message);
    }

}

消費

package com.hhly.rabbitmq.spring.consumer;

import java.io.UnsupportedEncodingException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutListenter implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println(new String(message.getBody(),"UTF-8"));
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

測試類

package com.hhly.rabbitmq;

import java.util.UUID;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.hhly.rabbitmq.spring.produce.MQProducer;
@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TestQueue {
    @Autowired
    MQProducer mqProducer;

    @Autowired
    private AmqpTemplate amqpTemplate;


    @Test
    public void sendFanout() {
        int i = 0;
        for(;;){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            String message = "hello,rabbmitmq!"+ i++;
            mqProducer.sendDataToFanout("logs", message);
        }
    }


}