ssm+rabbit 簡單搭建,消費者+生產者
名詞理解 Broker:簡單來說就是訊息佇列伺服器實體。
Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起 來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的權 限分離。
producer:訊息生產者,就是投遞訊息的程式。
consumer:訊息消費者,就是接受訊息的程式。
訊息佇列的使用過程大概如下:
(1)客戶端連線到訊息佇列伺服器,開啟一個channel。
(2)客戶端宣告一個exchange,並設定相關屬性。
(3)客戶端宣告一個queue,並設定相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好繫結關 系。
(5)客戶端投遞訊息到exchange
融入專案:先搭建普通的ssm框架
1.更改web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5"> <!-- spring 監聽 --> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <context-param> <param-name>contextConfigLocation</param-name> <param-value> classpath:spring-base.xml, classpath:spring-producer.xml, classpath:spring-common.xml </param-value> </context-param> <!-- <servlet> <servlet-name>CXFServlet</servlet-name> <servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>CXFServlet</servlet-name> <url-pattern>/cxf/*</url-pattern> </servlet-mapping> --> <servlet> <servlet-name>springMVC</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring-mvc.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>springMVC</servlet-name> <url-pattern>*.do</url-pattern> </servlet-mapping> <filter> <filter-name>encodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>encodingFilter</filter-name> <url-pattern>*.do</url-pattern> </filter-mapping> <welcome-file-list> <welcome-file>index.jsp</welcome-file> </welcome-file-list> </web-app>
2.jar包座標
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.2.RELEASE</version> </dependency>
3. 建立生產者配置檔案:spring-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 建立連線工廠 轉入 ip 密碼埠等 -->
<rabbit:connection-factory id="mqconnectionFactory" host="127.0.0.1" port="5672"
username="admin" password="admin"/>
<!-- 預設使用這個工廠 -->
<rabbit:admin id="connectAdmin" connection-factory="mqconnectionFactory" />
<!-- 給模板指定轉換器 --><!-- mandatory必須設定true,return callback才生效 -->
<rabbit:template id="amqpTemplate" connection-factory="mqconnectionFactory"
exchange="exchangeTest"
/>
<!-- 建立一個佇列 -->
<!--
durable 是否持久化
exclusive 僅建立者可以使用的私有佇列,斷開後自動刪除
auto-delete 當所有消費端連線斷開後,是否自動刪除佇列 -->
<rabbit:queue id="buyCar" name="buyCar" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<rabbit:queue id="buyHouse" name="buyHouse" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!--
交換機:一個交換機可以繫結多個佇列,一個佇列也可以繫結到多個交換機上。
如果沒有佇列繫結到交換機上,則傳送到該交換機上的資訊則會丟失。
direct模式:訊息與一個特定的路由器完全匹配,才會轉發
topic模式:按規則轉發訊息,最靈活
-->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<!-- bind:繫結 -->
<rabbit:bindings>
<rabbit:binding queue="buyCar" key="buyCarKey"></rabbit:binding>
<rabbit:binding queue="buyHouse" key="buyHouseKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
4.在controller層寫程式碼(發訊息的程式碼 )
package com.mr.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class TestController {
// 注入amqpTemplate 模板類
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/test/toTest")
public void toTest(Object msg) {
// 指定key 和要傳從引數,
amqpTemplate.convertAndSend("buyCarKey", "id=3");
amqpTemplate.convertAndSend("buyHouseKey", "id=4");
System.err.println("進入test方法");
}
}
5.消費者(繼續建立一個相同的maven專案,搭建好ssm框架)jar包同上
更改 web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5">
<!-- spring 監聽 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
classpath:spring-base.xml,
classpath:spring-consumer.xml,
classpath:spring-common.xml
</param-value>
</context-param>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>
</web-app>
6.消費者pom.xml配置檔案,父節點版本號有改動,spring版本號不能低於4.0,否則少一個工具包,其他無所謂,我這裡使用的是4.2.3
<!-- 父節點 引入jar包 子節點 使用 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- spring版本號 -->
<spring.version>4.2.3.RELEASE</spring.version>
<!-- mybatis版本號 -->
<mybatis.version>3.2.6</mybatis.version>
<!-- log4j日誌檔案管理包版本 -->
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
7.建立消費者配置檔案:spring-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<!--配置connection-factory,指定連線rabbit server引數 -->
<rabbit:connection-factory id="connectionFactory"
username="admin" password="admin" host="127.0.0.1" port="5672" />
<!-- 訊息接收者 -->
<bean id="buyCarListener" class="com.mr.jms.BuyCarListener"></bean>
<bean id="buyHouseListener" class="com.mr.jms.BuyHouseListener"></bean>
<!-- 宣告佇列 -->
<rabbit:queue id="buyCar" name="buyCar" durable="true"
auto-delete="false" exclusive="false" />
<rabbit:queue id="buyHouse" name="buyHouse" durable="true"
auto-delete="false" exclusive="false" />
<!-- queue litener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="buyCar" ref="buyCarListener" />
<rabbit:listener queues="buyHouse" ref="buyHouseListener" />
</rabbit:listener-container>
</beans>
8.監聽執行類包結構
9.BuyCarListener.java
package com.mr.jms;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import com.mr.service.TestService;
/**
* 這是監聽類,當有訊息事,執行這個方法
* @author Administrator
*
*/
public class BuyCarListener implements MessageListener{
@Autowired
private TestService testService;
@Override
public void onMessage(Message message) {
// message.getBody() 獲得引數
// TODO Auto-generated method stub
System.err.println("接受到訊息,在處理。。。。。"+testService);
}
}
10.BuyHouseListener.java
package com.mr.jms;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* 這是監聽類,當有訊息事,執行這個方法
* @author Administrator
*
*/
public class BuyHouseListener implements MessageListener{
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
System.err.println("接受到買房訊息,在處理。。。。。");
}
}
11.TestService.java
package com.mr.service;
import org.springframework.stereotype.Service;
@Service
public class TestService {
}