1. 程式人生 > >JMS之activemq訊息持久化

JMS之activemq訊息持久化

JMS 即 java message service 是為java提供了一種建立、傳送、接收訊息的通用方法。可以將複雜的系統進行業務分離,變成靈活的高度解耦合的佈局。同時對我們的日常業務需求開發,提供了非常靈活的業務解決方案。比如繳費還款送積分,送積分的業務邏輯不能影響到繳費還款的業務邏輯,所以最好的,就是繳費/還款邏輯執行完成之後,通過一種方式告訴積分系統,給使用者傳送積分,傳送積分的結果不要影響到複雜的繳費還款的過程。這種情況下,採用jms進行非同步處理,便是一個很好的選擇。

要使用訊息的方式來進行系統互動,我們需要一個訊息中間平臺,來進行訊息的接受轉發,同時處理複雜的訊息持久化等問題。本文我們採用activemq來做實驗。這樣的架構下,我們的系統通常會變成如下架構:

 訊息產生者 -> 訊息中心 -> 訊息消費者

1、訊息的兩種傳播方式

JMS支援兩種訊息傳播:PTP 和 PUB/SUB

PTP : 點對點發送。訊息的傳送方將訊息放入管道中,訊息的接收方從管道中取出訊息並處理。

PUB/SUB : 釋出/訂閱方式。訊息的釋出者將自己的主題放入訊息中心,同時進行訊息投遞,訊息訂閱者只獲取自己訂閱的訊息。

jms為了支援上述兩種模式,提供了兩套針對同樣介面的實現,對照關係如下:

ConnectionFacatory:被管理的物件,由客戶端(釋出者/接受者)使用,用來建立一個連結。

Connection:提供一個JMS訊息的活動連結。

Destination:封裝了訊息目的地,或者主題型別。

Session:一個用來發送和接受訊息的線上上下文。

MessageProducer:由session建立的,用來發送訊息的物件。

MessageConsumer:由session建立的用來接受訊息的物件。

2、jms訊息模型

Jms的訊息分為三部分:訊息頭、訊息屬性、訊息體

訊息頭:包含了訊息的客戶端和提供者用來路由和識別訊息的資料。

 訊息頭包含的欄位:

 JMSDestination:包含了訊息發往的目的地或者主題資訊。

 JMSDeliveryMode:訊息傳送模式。spring提供額jms模板提供了2種模式(有預設模式):DEFAULT_DELEVERY_MODE 預設模式、DEFAULT_PRIORITY、DEFAULT_TIME_TO_LIVE

 JMSMessageID:訊息標示,唯一性,每個訊息都不同,即便是承載者相同訊息體的訊息。

 JMSTimestamp:傳送時間

 JMSCorrelationID:與當前訊息關聯的其他訊息的標示

 JMSReplyTo:回覆訊息的目的地。帶有這樣屬性的訊息通常是傳送方希望有一個響應,這個響應是可選的。

 JMSRedelivered:帶有該欄位的訊息通常過去傳送過但是沒有被確認,如果要再次傳送,提供者必須設定該欄位。如果true,則訊息接受者必須進行訊息重複處理的邏輯。

 JMSType:訊息型別標示。官方文件的解釋:

JMSType頭欄位包含了由客戶端在傳送訊息時提供的訊息型別標識。一些訊息提供者使用訊息庫來儲存由應用傳送的訊息定義。type頭欄位可以引用提供者庫中的訊息定義。JMS沒有定義一個標準的訊息定義庫,也沒有定義這個庫中所包含的各種定義的命名策略。一些訊息系統要求每個被建立的應用訊息都必須有一個訊息型別定義,並且每個訊息都指定它的型別。為了能夠使JMS工作於這些訊息系統提供者,無論應用是否使用,JMS客戶端最好賦值JMSType ,這樣可以保證為需要該頭欄位的提供者提供了正確的設定。為了保證移植性,JMS客戶端應使用安裝時在提供者訊息庫中定義的語義值作為JMSType的值。 

 JMSExpiration :過期時間。

 JMSPriority:優先順序。

訊息屬性:包括了標準投資段之外的額外新增給訊息的可選的欄位。比如 應用指定的屬性。

訊息體:訊息攜帶的內容。

3、訊息傳輸程式設計步驟

 1)使用jndi獲取一個ConnectionFacatory物件;

 2)使用jndi獲取一個或者多個Destination物件;

 3)使用ConnectionFactory建立一個JMS連線;

 4)使用連線建立Jms session;

 5)使用session和destination建立MessageProducers和MessageConsumers

 6)使用Connection進行傳輸訊息;

上述是jms的基礎知識,簡單瞭解可以便於下面的應用。jms本身提供了jar可以下載並使用相關配置,結合訊息系統來完成訊息的傳送和接受等操作。但是一種便捷的方式,為加快開發,可以使用spring提供的jms模板,即JmsTemplate,這個類似於jdbcTemplate。

我們演示PTP和PUB/SUB兩種模式的配置。

先看下基礎公用的類:

我們定義:訊息傳送者、訊息接受者、訊息轉換器

[java] view plain copy print?
  1. /** 
  2.  * message sender 
  3.  * @description <p></p> 
  4.  * @author quzishen 
  5.  * @project NormandyPositionII 
  6.  * @class MessageSender.java 
  7.  * @version 1.0 
  8.  * @time 2011-1-11 
  9.  */
  10. publicclass MessageSender {  
  11.     // ~~~ jmsTemplate
  12.     public JmsTemplate jmsTemplate;  
  13.     /** 
  14.      * send message 
  15.      */
  16.     publicvoid sendMessage(){  
  17.         jmsTemplate.convertAndSend("hello jms!");  
  18.     }  
  19.     publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {  
  20.         this.jmsTemplate = jmsTemplate;  
  21.     }  
  22. }  
[java] view plain copy print?
  1. /** 
  2.  * message receiver 
  3.  * @description <p></p> 
  4.  * @author quzishen 
  5.  * @project NormandyPositionII 
  6.  * @class MessageReceiver.java 
  7.  * @version 1.0 
  8.  * @time 2011-1-11 
  9.  */
  10. publicclass MessageReceiver implements MessageListener {  
  11.     /* (non-Javadoc) 
  12.      * @see javax.jms.MessageListener#onMessage(javax.jms.Message) 
  13.      */
  14.     publicvoid onMessage(Message m) {  
  15.         System.out.println("[receive message]");  
  16.         ObjectMessage om = (ObjectMessage) m;  
  17.         try {  
  18.             String key1 = om.getStringProperty("key1");  
  19.             System.out.println(key1);  
  20.             System.out.println("model:"+om.getJMSDeliveryMode());  
  21.             System.out.println("destination:"+om.getJMSDestination());  
  22.             System.out.println("type:"+om.getJMSType());  
  23.             System.out.println("messageId:"+om.getJMSMessageID());  
  24.             System.out.println("time:"+om.getJMSTimestamp());  
  25.             System.out.println("expiredTime:"+om.getJMSExpiration());  
  26.             System.out.println("priority:"+om.getJMSPriority());  
  27.         } catch (JMSException e) {  
  28.             e.printStackTrace();  
  29.         }  
  30.     }  
  31. }  
[java] view plain copy print?
  1. /** 
  2.  * message converter 
  3.  * @description <p></p> 
  4.  * @author quzishen 
  5.  * @project NormandyPositionII 
  6.  * @class MessageConvertForSys.java 
  7.  * @version 1.0 
  8.  * @time 2011-1-11 
  9.  */
  10. publicclass MessageConvertForSys implements MessageConverter {  
  11.     /* (non-Javadoc) 
  12.      * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, javax.jms.Session) 
  13.      */
  14.     public Message toMessage(Object object, Session session)  
  15.             throws JMSException, MessageConversionException {  
  16.         System.out.println("[toMessage]");  
  17.         ObjectMessage objectMessage = session.createObjectMessage();  
  18.         objectMessage.setJMSExpiration(1000);  
  19.         objectMessage.setStringProperty("key1", object+"_add");  
  20.         return objectMessage;  
  21.     }  
  22.     /* (non-Javadoc) 
  23.      * @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message) 
  24.      */
  25.     public Object fromMessage(Message message) throws JMSException,  
  26.             MessageConversionException {  
  27.         System.out.println("[fromMessage]");  
  28.         ObjectMessage objectMessage = (ObjectMessage) message;  
  29.         return objectMessage.getObjectProperty("key1");  
  30.     }  
  31. }  

第一種,PTP方式的配置:

[java] view plain copy print?
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4.     xmlns:context="http://www.springframework.org/schema/context"
  5.     xmlns:aop="http://www.springframework.org/schema/aop"
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans 
  7.     http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
  8.     http://www.springframework.org/schema/context 
  9.     http://www.springframework.org/schema/context/spring-context-2.5.xsd "
  10.     default-autowire="byName">  
  11.     <!-- JMS PTP MODEL -->  
  12.     <!-- PTP連結工廠 -->  
  13.     <bean id="queueConnectionFactory"class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  14.         <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
  15.         <!-- <property name="brokerURL" value="vm://normandy.notify" /> -->  
  16.         <property name="useAsyncSend" value="true" />  
  17.     </bean>  
  18.     <!-- 定義訊息佇列 -->  
  19.     <bean id="dest"class="org.apache.activemq.command.ActiveMQQueue">  
  20.         <constructor-arg value="queueDest" />  
  21.     </bean>  
  22.     <!-- PTP jms模板 -->  
  23.     <bean id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">  
  24.         <property name="connectionFactory" ref="queueConnectionFactory"></property>  
  25.         <property name="defaultDestination" ref="dest" />  
  26.         <property name="messageConverter" ref="messageConvertForSys" />  
  27.         <property name="pubSubDomain" value="false" />  
  28.     </bean>  
  29.     <!-- 訊息轉換器 -->  
  30.     <bean id="messageConvertForSys"class="com.normandy.tech.test.MessageConvertForSys"></bean>  
  31.     <!-- 訊息傳送方 -->  
  32.     <bean id="messageSender"class="com.normandy.tech.test.MessageSender"></bean>  
  33.     <!-- 訊息接收方 -->  
  34.     <bean id="messageReceiver"class="com.normandy.tech.test.MessageReceiver"></bean>  
  35.     <!-- 訊息監聽容器 -->  
  36.     <bean id="listenerContainer"
  37.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
  38.         <property name="connectionFactory" ref="queueConnectionFactory" />    
  39.         <property name="destination" ref="dest" />    
  40.         <property name="messageListener" ref="messageReceiver" />    
  41.     </bean>  
  42. </beans>  

第二種:PUB/SUB方式的配置

我們配置兩個訊息訂閱者,分別訂閱不同的訊息,這樣用於判斷是否成功執行了訊息的釋出和訊息的訂閱

[java] view plain copy print?
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4.     xmlns:context="http://www.springframework.org/schema/context"
  5.     xmlns:aop="http://www.springframework.org/schema/aop"
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans 
  7.     http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
  8.     http://www.springframework.org/schema/context 
  9.     http://www.springframework.org/schema/context/spring-context-2.5.xsd "
  10.     default-autowire="byName">  
  11.     <!-- JMS TOPIC MODEL -->  
  12.     <!-- TOPIC連結工廠 -->  
  13.     <bean id="topicSendConnectionFactory"class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  14.         <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
  15.         <property name="useAsyncSend" value="true" />  
  16.     </bean>  
  17.     <bean id="topicListenConnectionFactory"class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  18.         <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
  19.     </bean>  
  20.     <!-- 定義主題 -->  
  21.     <bean id="myTopic"class="org.apache.activemq.command.ActiveMQTopic">  
  22.       <constructor-arg value="normandy.topic"/>  
  23.     </bean>  
  24.     <bean id="myTopic2"class="org.apache.activemq.command.ActiveMQTopic">  
  25.       <constructor-arg value="normandy.topic2"/>  
  26.     </bean>  
  27.     <!-- 訊息轉換器 -->  
  28.     <bean id="messageConvertForSys"class="com.normandy.tech.test.MessageConvertForSys"></bean>  
  29.     <!-- TOPIC send jms模板 -->  
  30.     <bean id="topicSendJmsTemplate"class="org.springframework.jms.core.JmsTemplate">  
  31.         <property name="connectionFactory" ref="topicSendConnectionFactory"></property>  
  32.         <property name="defaultDestination" ref="myTopic" />  
  33.         <property name="messageConverter" ref="messageConvertForSys" />  
  34.         <!-- 開啟訂閱模式 -->  
  35.         <property name="pubSubDomain" value="true"/>  
  36.     </bean>  
  37.     <!-- 訊息傳送方 -->  
  38.     <bean id="topicMessageSender"class="com.normandy.tech.test.MessageSender">  
  39.         <property name="jmsTemplate" ref="topicSendJmsTemplate"></property>  
  40.     </bean>  
  41.     <!-- 訊息接收方 -->  
  42.     <bean id="topicMessageReceiver"class="com.normandy.tech.test.MessageReceiver">  
  43.     </bean>  
  44.     <!-- 主題訊息監聽容器 -->  
  45.     <bean id="listenerContainer"
  46.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
  47.         <property name="connectionFactory" ref="topicListenConnectionFactory" />    
  48.         <property name="pubSubDomain" value="true"/><!-- default is false -->  
  49.         <property name="destination" ref="myTopic" />  <!-- listen topic: myTopic -->  
  50.         <property name="subscriptionDurable" value="true"/>  
  51.         <property name="clientId" value="clientId_001"/>  
  52.         <property name="messageListener" ref="topicMessageReceiver" />    
  53.     </bean>  
  54.     <!-- 主題訊息監聽容器2 -->  
  55.     <bean id="listenerContainer2"
  56.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
  57.         <property name="connectionFactory" ref="topicListenConnectionFactory" />    
  58.         <property name="pubSubDomain" value="true"/><!-- default is false -->  
  59.         <property name="destination" ref="myTopic2" />  <!-- listen topic: myTopic2 -->  
  60.         <property name="subscriptionDurable" value="true"/>  
  61.         <property name="clientId" value="clientId_002"/>  
  62.         <property name="messageListener" ref="topicMessageReceiver" />    
  63.     </bean>  
  64. </beans>  

單元測試程式碼:

[java] view plain copy print?
  1. publicclass TechTest extends TestCase {  
  2.     ApplicationContext ptpApplicationContext;  
  3.     ApplicationContext topicApplicationContext;  
  4.     @Override
  5.     protectedvoid setUp() throws Exception {  
  6.         super.setUp();  
  7.         ptpApplicationContext = new ClassPathXmlApplicationContext(  
  8.                 "com/normandy/tech/test/ptpContext.xml");  
  9.         topicApplicationContext = new ClassPathXmlApplicationContext(  
  10.                 "com/normandy/tech/test/topicContext.xml");  
  11.     }  
  12.     protected Object getPtpBean(String name) {  
  13.         return ptpApplicationContext.getBean(name);  
  14.     }  
  15.     protected Object getTopicBean(String name) {  
  16.         return topicApplicationContext.getBean(name);  
  17.     }  
  18. }  
[java] view plain copy print?
  1. /** 
  2.  * 測試訊息傳送 
  3.  * @description <p></p> 
  4.  * @author quzishen 
  5.  * @project NormandyPositionII 
  6.  * @class JmsQueueTest.java 
  7.  * @version 1.0 
  8.  * @time 2011-1-11 
  9.  */
  10. publicclass JmsQueueTest extends TechTest {  
  11.     /** 
  12.      * 測試訊息傳送 
  13.      */
  14.     publicvoid testQueueSend() {  
  15.         long beginTime = System.currentTimeMillis();  
  16.         // PTP
  17. //      MessageSender messageSender = (MessageSender) getPtpBean("messageSender");
  18. //      messageSender.sendMessage();
  19.         // TOPIC
  20.         MessageSender messageSender = (MessageSender) getTopicBean("topicMessageSender");  
  21.         messageSender.sendMessage();  
  22.         System.out.println("cost time:"+ (System.currentTimeMillis() - beginTime));  
  23.     }  
  24. }  

測試結果執行便可。

在這裡,訊息系統我們採用的是activemq,試想一個問題,如果訊息過多,這個時候發生了宕機,訊息是否會丟失?

這裡就涉及到了一個新問題,即訊息持久化。

activemq的訊息持久化分成兩種:檔案和資料庫(支援MySQL/oracle)。可以再其配置檔案中進行配置,activemq配置檔案採用的是spring的方式,所以配置起來非常的方便。

通常下載了activemq後,會有一系列的配置檔案demo,可以參照其中的樣例修改即可。

這裡我們使用mysql作為訊息持久化的資料庫伺服器。

將mysql的驅動包,拷貝到activemq的lib目錄,配置如下:

conf/activemq.xml

[xhtml] view plain copy print?
  1. <persistenceAdapter>
  2.             <!--<kahaDB directory="${activemq.base}/data/kahadb"/>-->
  3.              <jdbcPersistenceAdapterdataDirectory="${activemq.base}/data"dataSource="#mysql-ds"/>
  4.         </persistenceAdapter>
[java] view plain copy print?
  1. <bean id="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
  2.         <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
  3.         <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  
  4.         <property name="username" value="root"/>  
  5.         <property name="password" value="root"/>  
  6.         <property name="maxActive" value="200"/>  
  7.         <property name="poolPreparedStatements" value="true"/>  
  8.     </bean>  

特別注意的是,這裡指定的資料庫名稱,需要事先在mysql中建立好schema。

執行activemq,可以發現自動建立了三張表:

activemq_acks

activemq_lock

activemq_msgs