實戰Spring4+ActiveMQ整合實現訊息佇列(生產者+消費者)
引言:
最近公司做了一個以資訊保安為主的專案,其中有一個業務需求就是,專案定時監控操作使用者的行為,對於一些違規操作嚴重的行為,以傳送郵件(FoxMail)的形式進行郵件告警,可能是多人,也可能是一個人,第一次是以單人的形式,,直接在業務層需要告警的地方傳送郵件即可,可是後邊需求變更了,對於某些告警郵件可能會發送多人,這其中可能就會有阻塞發郵件的可能,直到把所有郵件傳送完畢後再繼續做下邊的業務,領導說這樣會影響使用者體驗,發郵件的時候使用者一直處於等待狀態,不能幹別的事情。最後研究說用訊息佇列,當有需要傳送郵件告警的時候,就向佇列中新增一個標識訊息,ActiveMQ通過監聽器的形式,實時監聽佇列裡邊的小時,收到訊息後,判斷是不是需要傳送告警的標識,是的話就自行就行傳送郵件!這是就研究的訊息佇列ActiveMQ,下邊就是具體內容:
1. ActiveMQ
1.1). ActiveMQ
ActiveMQ是Apache所提供的一個開源的訊息系統,完全採用Java來實現,因此,它能很好地支援J2EE提出的JMS(Java Message Service,即Java訊息服務)規範。JMS是一組Java應用程式介面,它提供訊息的建立、傳送、讀取等一系列服務。JMS提供了一組公共應用程式介面和響應的語法,類似於Java資料庫的統一訪問介面JDBC,它是一種與廠商無關的API,使得Java程式能夠與不同廠商的訊息元件很好地進行通訊。
1. 2). Java Message Service(JMS)
JMS支援兩種訊息傳送和接收模型。
-
一種稱為P2P(Ponit to Point)模型(點對點一對一),即採用點對點的方式傳送訊息。P2P模型是基於佇列的,訊息生產者傳送訊息到佇列,訊息消費者從佇列中接收訊息,佇列的存在使得訊息的非同步傳輸稱為可能,P2P模型在點對點的情況下進行訊息傳遞時採用。
-
另一種稱為Pub/Sub(Publish/Subscribe,即釋出-訂閱)模型,釋出-訂閱模型定義瞭如何向一個內容節點發布和訂閱訊息,這個內容節點稱為topic(主題)。主題可以認為是訊息傳遞的中介,訊息釋出這將訊息釋出到某個主題,而訊息訂閱者則從主題訂閱訊息。主題使得訊息的訂閱者與訊息的釋出者互相保持獨立,不需要進行接觸即可保證訊息的傳遞,釋出-訂閱模型在訊息的一對多廣播時採用。
1.3). JMS術語
- Provider/MessageProvider:生產者
- Consumer/MessageConsumer:消費者
- PTP:Point To Point,點對點通訊訊息模型
- Pub/Sub:Publish/Subscribe,釋出訂閱訊息模型
- Queue:佇列,目標型別之一,和PTP結合
- Topic:主題,目標型別之一,和Pub/Sub結合
- ConnectionFactory:連線工廠,JMS用它建立連線
- Connnection:JMS Client到JMS Provider的連線
- Destination:訊息目的地,由Session建立
- Session:會話,由Connection建立,實質上就是傳送、接受訊息的一個執行緒,因此生產者、消費者都是Session建立的
1.4). ActiveMQ應用場景
類似送快遞,快遞員(producer)將快遞(Message)放到指定地點(destination)後,就可以離開了,拿快遞的人(customer)在接收到通知後,到指定地點(destination)去取快遞(Message)就可以了。當然,取快遞時可能要進行身份驗證,這就涉及到建立連線(connection)時,需要指定使用者名稱和密碼了。還有就是,實際生活中,當快遞員把快遞放好之後,照理說應該通知客戶去哪裡取快遞,而ActiveMq幫我們做好了一切,通知的工作Activemq會幫我們實現,而無需我們親自編碼通知消費者,生產者只需要將Message放到Mq中即可,通知消費者的工作,mq會幫我們處理
用途就是用來處理訊息,也就是處理JMS的。訊息佇列在大型電子商務類網站,如京東、淘寶、去哪兒等網站有著深入的應用,佇列的主要作用是消除高併發訪問高峰,加快網站的響應速度。
在不使用訊息佇列的情況下,使用者的請求資料直接寫入資料庫,高發的情況下,會對資料庫造成巨大的壓力,同時也使得系統響應延遲加劇,但使用佇列後,使用者的請求發給佇列後立即返回。
1.5). ActiveMQ下載
1.6). 啟動
/apache-activemq-5.15.3/bin/win64/
目錄下雙擊activemq.bat檔案,在瀏覽器中輸入http://localhost:8161/admin/
, 使用者名稱和密碼輸入admin
即可
2. Srping+ActiveMQ應用例項
2,1). 專案結構
2,2). 匯入maven依賴,pom.xml檔案
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>www.cnblogs.com.hongmoshu</groupId> 7 <artifactId>test_actmq</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 <packaging>war</packaging> 10 <name>test_actmq Maven Webapp</name> 11 <url>http://www.example.com</url> 12 13 <!-- 版本管理 --> 14 <properties> 15 <springframework>4.1.8.RELEASE</springframework> 16 </properties> 17 18 19 <dependencies> 20 21 <!-- junit單元測試 --> 22 <dependency> 23 <groupId>junit</groupId> 24 <artifactId>junit</artifactId> 25 <version>4.11</version> 26 <scope>test</scope> 27 </dependency> 28 29 <!-- JSP相關 --> 30 <dependency> 31 <groupId>jstl</groupId> 32 <artifactId>jstl</artifactId> 33 <version>1.2</version> 34 </dependency> 35 <dependency> 36 <groupId>javax.servlet</groupId> 37 <artifactId>servlet-api</artifactId> 38 <scope>provided</scope> 39 <version>2.5</version> 40 </dependency> 41 42 <!-- spring --> 43 <dependency> 44 <groupId>org.springframework</groupId> 45 <artifactId>spring-core</artifactId> 46 <version>${springframework}</version> 47 </dependency> 48 <dependency> 49 <groupId>org.springframework</groupId> 50 <artifactId>spring-context</artifactId> 51 <version>${springframework}</version> 52 </dependency> 53 <dependency> 54 <groupId>org.springframework</groupId> 55 <artifactId>spring-tx</artifactId> 56 <version>${springframework}</version> 57 </dependency> 58 <dependency> 59 <groupId>org.springframework</groupId> 60 <artifactId>spring-webmvc</artifactId> 61 <version>${springframework}</version> 62 </dependency> 63 <dependency> 64 <groupId>org.springframework</groupId> 65 <artifactId>spring-jms</artifactId> 66 <version>${springframework}</version> 67 </dependency> 68 69 <!-- xbean 如<amq:connectionFactory /> --> 70 <dependency> 71 <groupId>org.apache.xbean</groupId> 72 <artifactId>xbean-spring</artifactId> 73 <version>3.16</version> 74 </dependency> 75 76 <!-- activemq --> 77 <dependency> 78 <groupId>org.apache.activemq</groupId> 79 <artifactId>activemq-core</artifactId> 80 <version>5.7.0</version> 81 </dependency> 82 <dependency> 83 <groupId>org.apache.activemq</groupId> 84 <artifactId>activemq-pool</artifactId> 85 <version>5.12.1</version> 86 </dependency> 87 88 <!-- gson --> 89 <dependency> 90 <groupId>com.google.code.gson</groupId> 91 <artifactId>gson</artifactId> 92 <version>1.7.1</version> 93 </dependency> 94 95 <!-- JSON --> 96 <dependency> 97 <groupId>net.sf.json-lib</groupId> 98 <artifactId>json-lib</artifactId> 99 <version>2.4</version> 100 <classifier>jdk15</classifier> 101 </dependency> 102 103 </dependencies> 104 105 <build> 106 <finalName>test_actmq</finalName> 107 108 </build> 109 </project>
2,3). ActiveMQ的配置檔案ActiveMQ.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:amq="http://activemq.apache.org/schema/core" 5 xmlns:jms="http://www.springframework.org/schema/jms" 6 xmlns:context="http://www.springframework.org/schema/context" 7 xmlns:mvc="http://www.springframework.org/schema/mvc" 8 xsi:schemaLocation=" 9 http://www.springframework.org/schema/beans 10 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd 11 http://www.springframework.org/schema/context 12 http://www.springframework.org/schema/context/spring-context-4.1.xsd 13 http://www.springframework.org/schema/mvc 14 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd 15 http://www.springframework.org/schema/jms 16 http://www.springframework.org/schema/jms/spring-jms-4.1.xsd 17 http://activemq.apache.org/schema/core 18 http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd" 19 > 20 21 <context:component-scan base-package="com.svse.service" /> 22 <mvc:annotation-driven /> 23 24 <!-- jms.useAsyncSend=true 允許非同步接收訊息 --> 25 <amq:connectionFactory id="amqConnectionFactory" 26 brokerURL="tcp://192.168.6.111:61616?jms.useAsyncSend=true" 27 userName="admin" 28 password="admin" /> 29 30 <!-- 配置JMS連線工 廠 --> 31 <bean id="connectionFactory" 32 class="org.springframework.jms.connection.CachingConnectionFactory"> 33 <constructor-arg ref="amqConnectionFactory" /> 34 <property name="sessionCacheSize" value="100" /> 35 </bean> 36 37 <!-- 定義訊息佇列名稱(Queue) --> 38 <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 39 <!-- 設定訊息佇列的名字 --> 40 <constructor-arg> 41 <value>Jaycekon</value> 42 </constructor-arg> 43 </bean> 44 45 <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它傳送、接收訊息。 --> 46 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 47 <property name="connectionFactory" ref="connectionFactory" /> 48 <property name="defaultDestination" ref="demoQueueDestination" /> 49 <property name="receiveTimeout" value="10000" /> 50 <!-- true是topic,false是queue,預設是false,此處顯示寫出false --> 51 <property name="pubSubDomain" value="false" /> 52 <!-- 訊息轉換器 --> 53 <property name="messageConverter" ref="userMessageConverter"/> 54 </bean> 55 56 <!-- 型別轉換器 --> 57 <bean id="userMessageConverter" class="com.svse.util.ObjectMessageConverter"/> 58 59 60 <!-- 配置訊息佇列監聽者(Queue) --> 61 <bean id="queueMessageListener" class="com.svse.util.QueueMessageListener" /> 62 63 <!-- 顯示注入訊息監聽容器(Queue),配置連線工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 --> 64 <bean id="queueListenerContainer" 65 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 66 <property name="connectionFactory" ref="connectionFactory" /> 67 <property name="destination" ref="demoQueueDestination" /> 68 <property name="messageListener" ref="queueMessageListener" /> 69 </bean> 70 71 </beans>
2,4). Spring的配置檔案 spring-mvc.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xmlns:mvc="http://www.springframework.org/schema/mvc" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/context 9 http://www.springframework.org/schema/context/spring-context-4.1.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd"> 12 13 <context:component-scan base-package="com.svse.controller" /> 14 <mvc:annotation-driven /> 15 <bean id="viewResolver" class="org.springframework.web.servlet.view.UrlBasedViewResolver"> 16 <property name="viewClass" 17 value="org.springframework.web.servlet.view.JstlView" /> 18 <property name="prefix" value="/WEB-INF/views/" /> 19 <property name="suffix" value=".jsp" /> 20 </bean> 21 22 </beans>
2,5). web.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns="http://xmlns.jcp.org/xml/ns/javaee" 4 xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" 5 id="WebApp_ID" version="3.1"> 6 <display-name>mydemo</display-name> 7 8 <welcome-file-list> 9 <welcome-file>index.jsp</welcome-file> 10 </welcome-file-list> 11 12 <!-- 載入spring及active的配置檔案,classpath為專案src下的路徑 --> 13 <context-param> 14 <param-name>contextConfigLocation</param-name> 15 <param-value> 16 classpath:spring-mvc.xml; 17 classpath:ActiveMQ.xml; 18 </param-value> 19 </context-param> 20 21 22 <listener> 23 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> 24 </listener> 25 26 <servlet> 27 <servlet-name>springMVC</servlet-name> 28 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> 29 <init-param> 30 <param-name>contextConfigLocation</param-name> 31 <param-value>classpath:spring-mvc.xml</param-value> 32 </init-param> 33 <load-on-startup>1</load-on-startup> 34 </servlet> 35 <servlet-mapping> 36 <servlet-name>springMVC</servlet-name> 37 <url-pattern>/</url-pattern> 38 </servlet-mapping> 39 40 <!-- 處理編碼格式 --> 41 <filter> 42 <filter-name>characterEncodingFilter</filter-name> 43 <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> 44 <init-param> 45 <param-name>encoding</param-name> 46 <param-value>UTF-8</param-value> 47 </init-param> 48 <init-param> 49 <param-name>forceEncoding</param-name> 50 <param-value>true</param-value> 51 </init-param> 52 </filter> 53 <filter-mapping> 54 <filter-name>characterEncodingFilter</filter-name> 55 <url-pattern>/*</url-pattern> 56 </filter-mapping> 57 58 </web-app>
2,6). 實體類Users物件
1 package com.svse.entity; 2 import java.io.Serializable; 3 4 public class Users implements Serializable{ 5 6 private String userId; 7 private String userName; 8 private String sex; 9 private String age; 10 private String type; 11 12 13 public Users() { 14 super(); 15 } 16 public Users(String userId, String userName, String sex, String age, 17 String type) { 18 super(); 19 this.userId = userId; 20 this.userName = userName; 21 this.sex = sex; 22 this.age = age; 23 this.type = type; 24 } 25 public String getUserId() { 26 return userId; 27 } 28 public void setUserId(String userId) { 29 this.userId = userId; 30 } 31 public String getUserName() { 32 return userName; 33 } 34 public void setUserName(String userName) { 35 this.userName = userName; 36 } 37 public String getSex() { 38 return sex; 39 } 40 public void setSex(String sex) { 41 this.sex = sex; 42 } 43 public String getAge() { 44 return age; 45 } 46 public void setAge(String age) { 47 this.age = age; 48 } 49 public String getType() { 50 return type; 51 } 52 public void setType(String type) { 53 this.type = type; 54 } 55 @Override 56 public String toString() { 57 return "Users [userId=" + userId + ", userName=" + userName + ", sex=" 58 + sex + ", age=" + age + ", type=" + type + "]"; 59 } 60 61 62 }
2,7). 核心程式碼(生產者ProducerService)
1 package com.svse.service; 2 3 import javax.annotation.Resource; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.Session; 8 9 import org.springframework.jms.core.JmsTemplate; 10 import org.springframework.jms.core.MessageCreator; 11 import org.springframework.stereotype.Service; 12 13 import com.svse.entity.Users; 14 15 @Service 16 public class ProducerService { 17 18 @Resource(name="jmsTemplate") 19 private JmsTemplate jmsTemplate; 20 21 22 /** 23 * 向指定佇列傳送訊息 (傳送文字訊息) 24 */ 25 public void sendMessage(Destination destination,final String msg){ 26 27 jmsTemplate.setDeliveryPersistent(true); 28 29 System.out.println(Thread.currentThread().getName()+" 向佇列"+destination.toString()+"傳送訊息---------------------->"+msg); 30 jmsTemplate.send(destination, new MessageCreator() { 31 public Message createMessage(Session session) throws JMSException { 32 return session.createTextMessage(msg); 33 } 34 }); 35 } 36 37 /** 38 * 向指定佇列傳送訊息以物件的方式 (傳送物件訊息) 39 */ 40 public void sendMessageNew(Destination destination,Users user){ 41 System.out.println(Thread.currentThread().getName()+" 向佇列"+destination.toString()+"傳送訊息---------------------->"+user); 42 jmsTemplate.convertAndSend(user); 43 } 44 45 /** 46 * 向預設佇列傳送訊息 47 */ 48 public void sendMessage(final String msg){ 49 String destination = jmsTemplate.getDefaultDestinationName(); 50 System.out.println(Thread.currentThread().getName()+" 向佇列"+destination+"傳送訊息---------------------->"+msg); 51 jmsTemplate.send(new MessageCreator() { 52 public Message createMessage(Session session) throws JMSException { 53 return session.createTextMessage(msg); 54 } 55 }); 56 } 57 }
2,8). 核心程式碼(消費產者ConsumerService)
1 package com.svse.service; 2 3 import javax.annotation.Resource; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.ObjectMessage; 7 import javax.jms.TextMessage; 8 9 import net.sf.json.JSONObject; 10 11 import org.springframework.jms.core.JmsTemplate; 12 import org.springframework.stereotype.Service; 13 14 import com.svse.entity.Users; 15 16 @Service 17 public class ConsumerService { 18 19 @Resource(name="jmsTemplate") 20 private JmsTemplate jmsTemplate; 21 //接收文字訊息 22 public TextMessage receive(Destination destination){ 23 TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination); 24 try{ 25 JSONObject json=JSONObject.fromObject(textMessage.getText()); 26 System.out.println("name:"+json.getString("userName")); 27 System.out.println("從佇列" + destination.toString() + "收到了訊息:\t" 28 + textMessage.getText()); 29 } catch (JMSException e) { 30 e.printStackTrace(); 31 } 32 return textMessage; 33 } 34 //接收物件訊息 35 public ObjectMessage receiveNew(Destination destination){ 36 ObjectMessage objMsg=(ObjectMessage) jmsTemplate.receive(destination); 38 try{ 39 Users users= (Users) objMsg.getObject(); 44 System.out.println("name:"+users.getUserName()); 47 System.out.println("從佇列" + destination.toString() + "收到了訊息:\t" 48 + users); 49 } catch (JMSException e) { 50 e.printStackTrace(); 51 } 52 return objMsg; 53 } 54 }
2,9). 核心程式碼(控制器ConsumerService)
1 package com.svse.controller.mq; 2 3 import java.io.IOException; 4 import java.text.SimpleDateFormat; 5 import java.util.Date; 7 import javax.annotation.Resource; 8 import javax.jms.DeliveryMode; 9 import javax.jms.Destination; 10 import javax.jms.JMSException; 11 import javax.jms.ObjectMessage; 12 import javax.jms.TextMessage; 13 import javax.management.MBeanServerConnection; 14 import javax.management.remote.JMXConnector; 15 import javax.management.remote.JMXConnectorFactory; 16 import javax.management.remote.JMXServiceURL; 18 import org.springframework.stereotype.Controller; 19 import org.springframework.web.bind.annotation.RequestMapping; 20 import org.springframework.web.bind.annotation.RequestMethod; 21 import org.springframework.web.bind.annotation.RequestParam; 22 import org.springframework.web.servlet.ModelAndView; 24 import com.google.gson.Gson; 25 import com.svse.entity.Users; 26 import com.svse.service.ConsumerService; 27 import com.svse.service.ProducerService; 28 29 @Controller 30 public class DemoController { 35 36 //佇列名Jaycekon (ActiveMQ中設定的佇列的名稱) 37 @Resource(name="demoQueueDestination") 38 private Destination demoQueueDestination; 39 40 //佇列訊息生產者 41 @Resource(name="producerService") 42 private ProducerService producer; 43 44 //佇列訊息消費者 45 @Resource(name="consumerService") 46 private ConsumerService consumer; 47 48 /* 49 * 準備發訊息 50 */ 51 @RequestMapping(value="/producer",method=RequestMethod.GET) 52 public ModelAndView producer(){ 53 System.out.println("------------go producer"); 54 55 Date now = new Date(); 56 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 57 String time = dateFormat.format( now ); 58 System.out.println(time); 59 60 ModelAndView mv = new ModelAndView(); 61 mv.addObject("time", time); 62 mv.setViewName("producer"); 63 return mv; 64 } 65 66 /* 67 * 發訊息 68 */ 69 @RequestMapping(value="/onsend",method=RequestMethod.POST) 70 public ModelAndView producer(@RequestParam("message") String message) { 71 System.out.println("------------send to jms"); 72 ModelAndView mv = new ModelAndView(); 73 for(int i=0;i<5;i++){ 74 try { 75 Users users=new Users("10"+(i+1),"趙媛媛"+(i+1),"女","27","影視演員"); 76 Gson gson=new Gson(); 77 String sendMessage=gson.toJson(users); 78 System.out.println("傳送的訊息sendMessage:"+sendMessage.toString()); 79 // producer.sendMessage(demoQueueDestination,sendMessage.toString());//以文字的形式 80 producer.sendMessageNew(demoQueueDestination, users);//以物件的方式 81 82 } catch (Exception e) { 83 e.printStackTrace(); 84 } 85 } 86 mv.setViewName("index"); 87 return mv; 88 } 89 /* 90 * 手動接收訊息 91 */ 92 @RequestMapping(value="/receive",method=RequestMethod.GET) 93 public ModelAndView queue_receive() throws JMSException { 94 System.out.println("------------receive message"); 95 ModelAndView mv = new ModelAndView(); 96 97 //TextMessage tm = consumer.receive(demoQueueDestination);//接收文字訊息 98 99 ObjectMessage objMsg=consumer.receiveNew(demoQueueDestination);//接收物件訊息 100 Users users= (Users) objMsg.getObject(); 101 //mv.addObject("textMessage", tm.getText()); 102 mv.addObject("textMessage", users.getUserId()+" || "+users.getUserName()); 103 mv.setViewName("receive"); 104 return mv; 105 } 106 107 /* 108 * ActiveMQ Manager Test 109 */ 110 @RequestMapping(value="/jms",method=RequestMethod.GET) 111 public ModelAndView jmsManager() throws IOException { 112 System.out.println("------------jms manager"); 113 ModelAndView mv = new ModelAndView(); 114 mv.setViewName("index"); 115 116 JMXServiceURL url = new JMXServiceURL(""); 117 JMXConnector connector = JMXConnectorFactory.connect(url); 118 connector.connect(); 119 MBeanServerConnection connection = connector.getMBeanServerConnection(); 120 121 return mv; 122 } 123 124 }
3. 物件轉換器MessageConverter和訊息監聽器MessageListener
在上邊的ProducerService和ConsumerService中,不論是傳送訊息還是接收訊息,都可以以文字TextMessage的方式和ObjectMessage的方式.如果是簡單的文字訊息可以以TextMessage,但是如果需要傳送的內容比較多,結構比較複雜,這時候就建議用物件文字ObjectMessage的方式向佇列queue中傳送訊息了.但是這時候就需要用到物件訊息轉換器MessageConverter.
3,1). 訊息轉換器MessageageConverte
MessageConverter的作用主要有兩方面,一方面它可以把我們的非標準化Message物件轉換成我們的目標Message物件,這主要是用在傳送訊息的時候;另一方面它又可以把我們的Message物件
轉換成對應的目標物件,這主要是用在接收訊息的時候。
1 package com.svse.util; 2 3 import java.io.Serializable; 4 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.ObjectMessage; 8 import javax.jms.Session; 9 10 import org.springframework.jms.support.converter.MessageConversionException; 11 import org.springframework.jms.support.converter.MessageConverter; 12 13 /** 14 *功能說明:通用的訊息物件轉換類 15 *@author:zsq 16 *create date:2019年7月12日 上午9:28:31 17 *修改人 修改時間 修改描述 18 *Copyright (c)2019北京智華天成科技有限公司-版權所有 19 */ 20 public class ObjectMessageConverter implements MessageConverter { 21 22 23 //把一個Java物件轉換成對應的JMS Message (生產訊息的時候) 24 public Message toMessage(Object object, Session session) 25 throws JMSException, MessageConversionException { 26 27 return session.createObjectMessage((Serializable) object); 28 } 29 30 //把一個JMS Message轉換成對應的Java物件 (消費訊息的時候) 31 public Object fromMessage(Message message) throws JMSException, 32 MessageConversionException { 33 ObjectMessage objMessage = (ObjectMessage) message; 34 return objMessage.getObject(); 35 } 36 37 }
注意:寫了訊息轉化器之後還需要的ActiveMQ.xml中進行配置
3,2). 訊息監聽器MessageageListe
MessageageListe作用就是動態的自行監聽訊息佇列的生產者傳送的訊息,不需要人工手動接收!
1 package com.svse.util; 2 import javax.jms.JMSException; 3 import javax.jms.Message; 4 import javax.jms.MessageListener; 5 import javax.jms.ObjectMessage; 6 import javax.jms.TextMessage; 7 8 import com.svse.entity.Users; 9 10 11 public class QueueMessageListener implements MessageListener { 12 13 //添加了監聽器,只要生產者釋出了訊息,監聽器監聽到有訊息消費者就會自動消費(獲取訊息) 14 public void onMessage(Message message) { 15 //(第1種方式)沒加轉換器之前接收到的是文字訊息 16 //TextMessage tm = (TextMessage) message; 17 18 //(第2種方式)加了轉換器之後接收到的ObjectMessage物件訊息 19 ObjectMessage objMsg=(ObjectMessage) message; 20 Users users; 21 try { 22 users = (Users) objMsg.getObject(); 23 //System.out.println("QueueMessageListener監聽到了文字訊息:\t" + tm.getText()); 24 System.out.println("QueueMessageListener監聽到了文字訊息:\t" + users); 25 //do something ... 26 } catch (JMSException e1) { 27 // TODO Auto-generated catch block 28 e1.printStackTrace(); 29 } 30 } 31 32 }
同樣寫好監聽器後也是需在ActiveMQ.xml中進行配置註冊的
小結
(1)註冊JmsTemplate時,pubSubDomain這個屬性的值要特別注意。預設值是false,也就是說預設只是支援queue模式,不支援topic模式。但是,如果將它改為true,則不支援queue模式。因此如果專案需要同時支援queue和topic模式,那麼需要註冊2個JmsTemplate,同時監聽容器(<jms:listener-container>)也需要註冊2個
(2)使用Queue時,生產者只要將Message傳送到MQ伺服器端,消費者就可以進行消費,而無需生產者程式一直執行;
(3)訊息是按照先入先出的順序,一旦有消費者將Message消費,該Message就會從MQ伺服器佇列中刪去;
(4)有文章說,“生產者”<-->"消費者"是一對一的關係,其實並不準確,從應用中可以看出,一個生產者產生的訊息,可以被
多個消費者進行消費,只不過多個消費者在消費訊息時是競爭的關係,先得到的先消費,一旦消費完成,該訊息就會出佇列, 就不能被其他消費者再消費了,即“一次性消費”。就是我們熟悉的“點對點”通訊了;至此,訊息佇列ActiveMQ就講解完了,傳送訊息和接收訊息的頁面在此不沒貼出來了,如疑問或是更好的方式方法的朋友,歡迎留言一起討