rabbitmq 整合 spring mvc
阿新 • • 發佈:2018-12-10
引入包
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId >
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version >4.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency >
rabbitmq.properties
## rabbitmq 基礎引數配置 ##
username=guest
password=guest
host=192.168.74.167
port=5672
virtual_host=/
rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
">
<!-- 執行緒池配置 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心執行緒數,預設為1 -->
<property name="corePoolSize" value="10" />
<!-- 最大執行緒數,預設為Integer.MAX_VALUE -->
<property name="maxPoolSize" value="50" />
<!-- 佇列最大長度,一般需要設定值>=notifyScheduledMainExecutor.maxNum;預設為Integer.MAX_VALUE -->
<property name="queueCapacity" value="3000" />
<!-- 執行緒池維護執行緒所允許的空閒時間,預設為60s -->
<property name="keepAliveSeconds" value="300" />
<!-- 執行緒池對拒絕任務(無執行緒可用)的處理策略,目前只支援AbortPolicy、CallerRunsPolicy;預設為後者 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
<!-- 建立connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${host}" username="${username}"
password="${password}" port="${port}" virtual-host="${virtual_host}"/>
<!-- 通過指定下面的admin資訊,當前productor中的exchange和queue會在rabbitmq伺服器上自動生成 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 定義rabbit template 用於資料的接收和傳送 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />
<!-- 廣播 -->
<rabbit:fanout-exchange name="logs" durable="false" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="logs_1"></rabbit:binding>
<rabbit:binding queue="logs_2"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 列隊 -->
<rabbit:queue name="logs_1" durable="false" auto-delete="false" exclusive="false">
</rabbit:queue>
<rabbit:queue name="logs_2" durable="false" auto-delete="false" exclusive="false">
</rabbit:queue>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
<rabbit:listener queues="task_queue" ref="taskListenter"/>
<rabbit:listener queues="logs_1" ref="fanoutListenter" response-exchange="logs"/>
<rabbit:listener queues="logs_2" ref="fanoutListenter" response-exchange="logs"/>
</rabbit:listener-container>
</beans>
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.1.xsd">
<context:component-scan base-package="com.hhly.*" />
<!-- 載入rabbitmq -->
<bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath*:rabbitmq.properties</value>
</list>
</property>
</bean>
<!-- 引入配置檔案 -->
<import resource="rabbitmq.xml"/>
</beans>
生產
package com.hhly.rabbitmq.spring.produce;
public interface MQProducer {
/**
* 傳送訊息到指定佇列
* @param queueKey
* @param object
*/
public void sendDataToQueue(String queueKey, String message);
/**
* 傳送廣播資訊
* @author jiangwei
* @Version 1.0
* @CreatDate 2017年3月27日 下午2:17:38
* @param exchange
* @param object
*/
public void sendDataToFanout(String exchange,String message);
}
package com.hhly.rabbitmq.spring.produce;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQProducerImpl implements MQProducer {
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void sendDataToQueue(String queueKey, String message) {
byte [] body= message.getBytes();
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//properties.setPriority(5);
Message message2 = new Message(body,properties );
amqpTemplate.send(queueKey,message2);
}
@Override
public void sendDataToFanout(String exchange, String message) {
amqpTemplate.convertAndSend(exchange, "", message);
}
}
消費
package com.hhly.rabbitmq.spring.consumer;
import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListenter implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println(new String(message.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
測試類
package com.hhly.rabbitmq;
import java.util.UUID;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hhly.rabbitmq.spring.produce.MQProducer;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TestQueue {
@Autowired
MQProducer mqProducer;
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void sendFanout() {
int i = 0;
for(;;){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = "hello,rabbmitmq!"+ i++;
mqProducer.sendDataToFanout("logs", message);
}
}
}