7、Spring Integration RabbitMQ
經過前面的教程我們知道了 RabbitMQ
裡面的基本的佇列訊息傳送,以及不同型別的交換器的訊息傳送。今天就來分享一下 RabbitMQ
與 Spring
整合的 demo。整合裡面主要分享三個型別的訊息:
- 預設交換器(也就是佇列)型別的訊息, 傳送一個測試訊息到佇列
queue.test.queue
裡面。然後使用監聽去列印獲取到的訊息。 - fanout(扇形)交換器型別的訊息,也就是釋出/訂閱模型。傳送一個訂單建立的訊息到佇列
order.create.queue
裡面,然後risk(風控)
、sms(簡訊)
和statistics(統計)
這三個服務分別去訂閱這個服務進行相應的業務處理。 - topic(主題)交換器型別的訊息,最靈活的交換器。模擬不同模組傳送不同級別的日誌到相應主題的佇列裡面。路由鍵的模式為<日誌級別.模組>。包含所有模組的日誌(
all.log.queue
email.all.queue
)的佇列,郵箱模組錯誤的日誌(email.error.queue
)以及所有模組錯誤日誌(all.error.queue
)的佇列這幾個不同的主題。
因為是 demo 的形式,把 provider 與 consumer 這兩個角色都放在了一個專案裡面。整體邏輯為:頁面選擇一個型別(佇列訊息, fanout
交換器訊息,topic
交換器訊息)的消費進行傳送,RabbitMqController
對應的方法接收這個訊息,然後使用 RabbitTemplate
把訊息傳送到 RabbitMQ
。配置的 MessageListener
實現者消費對應佇列的消費。
1、專案的整體結構
下面就是專案的整體結構圖。
2、專案依賴
下面是整個專案的依賴檔案。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion >4.0.0</modelVersion>
<groupId>cn.carlzone.spring.rabbitmq</groupId>
<artifactId>spring-rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.9.RELEASE</version>
</dependency>
<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.0.13</version>
</dependency>
<!-- tools -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
3、整合程式碼
3.1 預設交換器型別訊息
QueueService.java
屬於普通佇列訊息訊息監聽者,用於處理普通佇列的訊息。
@Component("queueService")
public class QueueService implements MessageListener {
private Logger LOGGER = LoggerFactory.getLogger(QueueService.class);
@Override
public void onMessage(Message message) {
LOGGER.info("QueueService get message : " + new String(message.getBody()));
}
}
3.2 fanout 交換器型別訊息
傳送訂單建立的訊息到 RabbitMQ,然後風控服務,簡訊服務以及監控服務分別監聽處理相應邏輯
RiskService.java – 風控
@Component("riskService")
public class RiskService implements MessageListener {
private Logger LOGGER = LoggerFactory.getLogger(RiskService.class);
@Override
public void onMessage(Message message) {
LOGGER.info("RiskService get message : " + new String(message.getBody()));
}
}
SmsService.java – 簡訊
@Component("smsService")
public class SmsService implements MessageListener {
private Logger LOGGER = LoggerFactory.getLogger(SmsService.class);
@Override
public void onMessage(Message message) {
LOGGER.info("SmsService get message : " + new String(message.getBody()));
}
}
StatisticsService.java – 統計服務
@Component("statisticsService")
public class StatisticsService implements MessageListener {
private Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class);
@Override
public void onMessage(Message message) {
LOGGER.info("StatisticsService get message : " + new String(message.getBody()));
}
}
3.3 topic 交換器型別訊息
AllErrorTopicService.java
監聽所有模組的錯誤日誌級別的訊息,然後通過日誌的形式打印出來。
@Component("allErrorTopicService")
public class AllErrorTopicService implements MessageListener{
private Logger LOGGER = LoggerFactory.getLogger(AllErrorTopicService.class);
public void onMessage(Message message) {
LOGGER.info("AllErrorTopicService get message : " + new String(message.getBody()));
}
}
AllLogTopicService.java
監聽所有模組的所有日誌級別的訊息,然後通過日誌的形式打印出來。
@Component("allLogTopicService")
public class AllLogTopicService implements MessageListener{
private Logger LOGGER = LoggerFactory.getLogger(AllLogTopicService.class);
public void onMessage(Message message) {
LOGGER.info("AllLogTopicService get message : "+new String(message.getBody()));
}
}
EmailAllTopicService.java
監聽 Email 模組的所有日誌級別的訊息,然後通過日誌的形式打印出來。
@Component("emailAllTopicService")
public class EmailAllTopicService implements MessageListener{
private Logger LOGGER = LoggerFactory.getLogger(EmailAllTopicService.class);
public void onMessage(Message message) {
LOGGER.info("EmailAllTopicService Get message : " + new String(message.getBody()));
}
}
EmailErrorTopicService.java
監聽 Email 模組下的錯誤日誌級別的訊息,然後通過日誌的形式打印出來。
@Component("emailErrorTopicService")
public class EmailErrorTopicService implements MessageListener{
private Logger LOGGER = LoggerFactory.getLogger(EmailErrorTopicService.class);
public void onMessage(Message message) {
LOGGER.info("EmailErrorTopicService Get message : " + new String(message.getBody()));
}
}
3.4 RabbitMqController.java
RabbitMqController.java
,屬於 provider 的邏輯,它的主要作用是傳送不同型別的訊息到 RabbitMQ 伺服器
@RestController
@RequestMapping("rabbit")
public class RabbitMqController {
private Logger logger = LoggerFactory.getLogger(RabbitMqController.class);
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("queue")
public String queue(@RequestParam("message") String message) {
String result;
try {
String str = "queue message,the message is : " + message;
logger.info("send queue message, the message is [" + str + "]");
rabbitTemplate.send("", "queue.test.queue", new Message(str.getBytes(), new MessageProperties()));
result = "success";
} catch (Exception e) {
result = e.getCause().toString();
}
return result;
}
@RequestMapping("fanout")
public String fanout(@RequestParam("message") String message) {
String result;
try {
String str = "fanout message,the message is : " + message;
logger.info("send fanout message, the message is [" + str + "]");
rabbitTemplate.send("fanout-exchange", "", new Message(str.getBytes(), new MessageProperties()));
result = "success";
} catch (Exception e) {
result = e.getCause().toString();
}
return result;
}
@RequestMapping("topic")
public String topic(@RequestParam("message") String message) {
String result;
try {
// 日誌嚴重級別
List<String> severities = Lists.newArrayList("error", "info", "warning");
// 專案模組
List<String> modules = Lists.newArrayList("email", "order", "user");
for (int i = 0; i < severities.size(); i++) {
for (int j = 0; j < modules.size(); j++) {
String routeKey = buildRouteKey(severities.get(i), modules.get(j));
String str = "send topic message, the message is [routeKey :" + routeKey + "][ context : " + message + "]";
rabbitTemplate.send("topic-exchange", routeKey, new Message(str.getBytes(), new MessageProperties()));
}
}
result = "success";
} catch (Exception e) {
result = e.getCause().toString();
}
return result;
}
private String buildRouteKey(String severity, String module) {
StringBuilder routeKey = new StringBuilder();
routeKey.append(severity).append(".").append(module);
return routeKey.toString();
}
}
3.5 index.jsp
index.jsp
, 屬於 provider 的程式碼,用於傳送不同訊息的頁面操作。
<%
String path = request.getContextPath();
System.out.println(path);
String basePath = request.getScheme() + "://"
+ request.getServerName() + ":" + request.getServerPort()
+ path + "/";
System.out.println(basePath);
%>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<base href="<%=basePath%>">
<title>RabbitMQ Demo</title>
<meta http-equiv="pragma" content="no-cache">
<meta http-equiv="cache-control" content="no-cache">
<meta http-equiv="expires" content="0">
<script type="text/javascript" src="<%--<%=basePath%>--%>js/jquery-1.11.0.min.js"></script>
<style type="text/css">
h1 {
margin: 0 auto;
}
#producer {
width: 48%;
border: 1px solid blue;
height: 80%;
align: center;
margin: 0 auto;
}
body {
text-align: center;
}
div {
text-align: center;
}
textarea {
width: 80%;
height: 100px;
border: 1px solid gray;
}
button {
background-color: rgb(62, 156, 66);
border: none;
font-weight: bold;
color: white;
height: 30px;
}
</style>
<script type="text/javascript">
function send(controller) {
if ($("#message").val() == "") {
$("#message").css("border", "1px solid red");
return;
} else {
$("#message").css("border", "1px solid gray");
}
$.ajax({
type: 'post',
url: '<%=basePath%>rabbit/' + controller,
dataType: 'text',
data: {"message": $("#message").val()},
success: function (data) {
if (data == "suc") {
$("#status").html("<font color=green>send success</font>");
setTimeout(clear, 1000);
} else {
$("#status").html("<font color=red>" + data + "</font>");
setTimeout(clear, 5000);
}
},
error: function (data) {
$("#status").html("<font color=red>ERROR:" + data["status"] + "," + data["statusText"] + "</font>");
setTimeout(clear, 5000);
}
});
}
function clear() {
$("#status").html("");
}
</script>
</head>
<body>
<h1>Hello RabbitMQ</h1>
<div id="producer">
<h2>Producer</h2>
<textarea id="message"></textarea>
<br>
<button onclick="send('queue')">Send Queue Message</button>
<button onclick="send('fanout')">Send Fanout Message</button>
<button onclick="send('topic')">Send Topic Message</button>
<br>
<span id="status"></span>
</div>
</body>
</html>
3.6 root-beans.xml
root-beans.xml
這個裡面同時配置了 provider 與 consumer 的 rabbitmq 整合邏輯。它們兩個的共同配置只有 CachingConnectionFactory , provider 與 consumer 都需要連線 RabbitMQ。而對於 provider 它只需要 RabbitTemplate 進行訊息傳送,RabbitAdmin 來通過方法來呼叫 RabbitMQ-UI 提供的 restful 介面來操作 RabbitMQ 的佇列、繫結以及裡面的屬性操作。而對於 consumer 它只需要進行佇列名稱宣告然後把佇列與不同的交換機的路由鍵繫結起來。最後在監聽器容器裡面把不同的佇列與它的監聽處理類繫結在一起。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="cn.carlzone.spring.rabbitmq.consumer" />
<!-- provider 與 consumer 共用邏輯 start -->
<!-- rabbitmq 配置 -->
<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" >
<property name="host" value="192.168.75.131" />
<property name="username" value="carl" />
<property name="password" value="198918" />
<property name="port" value="5672" />
<!-- 與伺服器的心跳時間(預設60s) -->
<property name="requestedHeartBeat" value="60" />
<!-- RabbitMQ 虛擬主機(預設 "/") -->
<property name="virtualHost" value="/" />
</bean>
<!-- provider 與 consumer 共用邏輯 end -->
<!-- rabbit provider start -->
<!-- rabbitmq admin -->
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<!-- RabbitTemplate 訊息模板 -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate" >
<constructor-arg ref="rabbitConnectionFactory" />
</bean>
<!-- rabbit provider end -->
<!-- rabbitmq consumer start -->
<!-- 預設交換器 -->
<rabbit:queue name="queue.test.queue" durable="false" />
<!-- fanout交換器 -->
<rabbit:queue name="fanout.queue.sms" durable="false"/>
<rabbit:queue name="fanout.queue.risk" durable="false"/>
<rabbit:queue name="fanout.queue.statistics" durable="false"/>
<!-- 把需要資料的佇列與交換器繫結一起 -->
<rabbit:fanout-exchange name="fanout-exchange" durable="false">
<rabbit:bindings>
<rabbit:binding queue="fanout.queue.sms"></rabbit:binding>
<rabbit:binding queue="fanout.queue.risk"></rabbit:binding>
<rabbit:binding queue="fanout.queue.statistics"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- topic交換器 -->
<rabbit:queue name="all.log.queue" durable="false"/>
<rabbit:queue name="email.all.queue" durable="false"/>
<rabbit:queue name="email.error.queue" durable="false"/>
<rabbit:queue name="all.error.queue" durable="false"/>
<!-- 把需要資料的佇列通過路由鍵與交換器繫結一起 -->
<rabbit:topic-exchange name="topic-exchange" durable="false">
<rabbit:bindings>
<rabbit:binding queue="all.log.queue" pattern="#"></rabbit:binding>
<rabbit:binding queue="email.all.queue" pattern="*.email"></rabbit:binding>
<rabbit:binding queue="email.error.queue" pattern="error.email"></rabbit:binding>
<rabbit:binding queue="all.error.queue" pattern="error.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- topic交換器 end-->
<!--監聽容器-->
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<!-- queue message -->
<rabbit:listener ref="queueService" queues="queue.test.queue" method="onMessage" />
<!-- fanout exchange message -->
<rabbit:listener ref="riskService" queues="fanout.queue.risk" method="onMessage" />
<rabbit:listener ref="smsService" queues="fanout.queue.sms" method="onMessage" />
<rabbit:listener ref="statisticsService" queues="fanout.queue.statistics" method="onMessage" />
<!-- topic exchange message -->
<rabbit:listener ref="allLogTopicService" queues="all.log.queue" method="onMessage" />
<rabbit:listener ref="emailAllTopicService" queues="email.all.queue" method="onMessage" />
<rabbit:listener ref="emailErrorTopicService" queues="email.error.queue" method="onMessage" />
<rabbit:listener ref="allErrorTopicService" queues="all.error.queue" method="onMessage" />
</rabbit:listener-container>
<!-- rabbit consumer end -->
</beans>
3.7 mvc-beans.xml
mvc-beans.xml
屬於 provider 方的邏輯,用於載入 Controller,來接收頁面傳送過來的訊息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
<mvc:resources mapping="/js/**" location="/js/"/>
<mvc:annotation-driven />
<context:component-scan base-package="cn.carlzone.spring.rabbitmq.provider" />
<bean id="viewResolver" class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
<property name="order" value="0" />
<property name="viewResolvers">
<list>
<bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />
<bean
class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="viewClass"
value="org.springframework.web.servlet.view.JstlView" />
<property name="prefix" value="/WEB-INF/pages/" />
<property name="suffix" value=".jsp"></property>
</bean>
</list>
</property>
</bean>
</beans>
3.8 logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>%d{HH:mm:ss.SSS} [%thread] %level %logger{36} - %msg%n</Pattern>
</encoder>
</appender>
<logger name="cn.carlzone.spring.rabbitmq" level="debug" />
<logger name="org.springframework">
<level value="error" />
<additivity value="false" />
</logger>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
3.9 web.xml
web.xml
載入 Spring 容器,啟動專案。
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:root-beans.xml</param-value>
</context-param>
<filter>
<filter-name>characterEncoding</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncoding</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- Spring MVC Config Start -->
<servlet>
<servlet-name>SpringMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:mvc-beans.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>
</web-app>
4、驗證
啟動專案,輸入 http://localhost:8080/
會出現以下頁面。
4.1 傳送佇列訊息
傳送一條佇列訊息 queue message
,可以看到在控制檯列印了 Controller 傳送了一條訊息,然後消費了一條佇列訊息。