RabbitMQ實踐--與Spring的簡單整合操作
阿新 • • 發佈:2019-01-04
瞭解RabbitMQ
與Spring整合
spring支援與rabbit的快速整合
pom.xml
需要的jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<parent>
<artifactId>alltest</artifactId>
<groupId>com.leo.test</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbitmq</artifactId>
<dependencies >
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId >amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.5</version>
</dependency>
<!-- spring全家桶 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
<!-- 用來做spring測試 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
</dependencies>
</project>
application-mq.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:util="http://www.springframework.org/schema/util"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<description>rabbitmq 連線服務配置</description>
<!-- 自動裝載bean使能-->
<context:component-scan base-package="com.leo.spring"/>
<context:annotation-config/>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations"> <!-- PropertyPlaceholderConfigurer類中有個locations屬性,接收的是一個數組,即我們可以在下面配好多個properties檔案 -->
<array>
<value>classpath:rabbitmq-config.properties</value>
</array>
</property>
</bean>
<!-- 連線配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--申明一個訊息佇列Queue durable:是否持久化 exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除
auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列-->
<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
<!-- 宣告Exchange rabbit:direct-exchange:定義exchange模式為direct -->
<rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue_key" key="test_queue_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- spring template宣告-->
<rabbit:template id="amqpTemplate" exchange="test-mq-exchange" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="test_queue_key" ref="queueListenter"/>
</rabbit:listener-container>
<!-- 訊息物件json轉換類 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
</beans>
rabbitmq-config.properties
mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/
MQProducer及MQProducerImpl
/**
*
* @author xuexiaolei
* @version 2017年08月22日
*/
public interface MQProducer {
public void sendDataToQueue(String queueKey, Object object);
}
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
* @author xuexiaolei
* @version 2017年08月22日
*/
@Service
public class MQProducerImpl implements MQProducer{
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToQueue(String queueKey, Object object) {
amqpTemplate.convertAndSend(queueKey, object);
}
}
QueueListenter
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;
/**
*
* @author xuexiaolei
* @version 2017年08月22日
*/
@Component
public class QueueListenter implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
}
執行測試
/**
*
* @author xuexiaolei
* @version 2017年08月22日
*/
@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:/application-mq.xml"})
public class MQProducerImplTest {
@Autowired MQProducer mqProducer;
final String queue_key = "test_queue_key";
@Test public void sendDataToQueue() throws Exception {
Map<String,Object> msg = new HashMap();
msg.put("data","hello,rabbmitmq!");
mqProducer.sendDataToQueue(queue_key, msg);
}
}
問題小記
預設的guest使用者只能訪問“/”的virtualHost,訪問其他的virtualHost會出現下面的錯誤
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost 'testmq' refused for user 'guest', class-id=10, method-id=40)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128)
... 46 more