ActiveMQ的設置消息時長,事務,確認機制 ,持久化
轉載中------------
1.消息事務
消息事務是在生產者producer到broker或broker到consumer過程中同一個session中發生的,保證幾條消息在發送過程中的原子性。(Broker:消息隊列核心,相當於一個控制中心,負責路由消息、保存訂閱和連接、消息確認和控制事務) 在支持事務的session中,producer發送message時在message中帶有transactionID。broker收到message後判斷是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。 消息生產者-異步發送消息生產者使用持久(persistent)傳遞模式發送消息的時候,Producer.send() 方法會被阻塞,直到 broker 發送一個確認消息給生產者(ProducerAck),這個確認消息暗示broker已經成功接收到消息並把消息保存到二級存儲中。這個過程通常稱為同步發送。 如果應用程序能夠容忍一些消息的丟失,那麽可以使用異步發送。異步發送不會在受到 broker 的確認之前一直阻塞 Producer.send 方法。但有一個例外,當發送方法在一個事務上下文中時,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味著所有的持久消息都以被寫到二級存儲中。 想要使用異步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true 如果設置了alwaysSyncSend=true系統將會忽略useAsyncSend設置的值都采用同步 1) 當alwaysSyncSend=false時,“NON_PERSISTENT”(非持久化)、事務中的消息將使用“異步發送” 2) 當alwaysSyncSend=false時,如果指定了useAsyncSend=true,“PERSISTENT”類型的消息使用異步發送。如果useAsyncSend=false,“PERSISTENT”類型的消息使用同步發送。 總結:默認情況(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事務內的消息均采用異步發送;對於持久化消息采用同步發送。 jms.sendTimeout:發送超時時間,默認等於0,如果jms.sendTimeout>0將會忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步發送! 即使使用異步發送,也可以通過producerWindowSize來控制發送端無節制的向broker發送消息 producerWindowSize:窗口尺寸,用來約束在異步發送時producer端允許積壓的(尚未ACK)的消息的尺寸,且只對異步發送有意義。每次發送消息之後,都將會導致memoryUsage尺寸增加(+message.size),當broker返回producerAck時,如果達到了producerWindowSize上限,即使是異步調用也會被阻塞,防止不停向broker發送消息。
通過jms.producerWindowSize=。。。來設置
2.消息時長,確認機制
public class TopicPub { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /** Session javax.jms.Connection.createSession(boolean transacted, int acknowledgeMode) throws JMSException 1.transacted事務,事務成功commit,才會將消息發送到mom中 2.acknowledgeMode消息確認機制 1)、帶事務的session 如果session帶有事務,並且事務成功提交,則消息被自動簽收。如果事務回滾,則消息會被再次傳送。 消息事務是在生產者producer到broker或broker到consumer過程中同一個session中發生的, 保證幾條消息在發送過程中的原子性。 在支持事務的session中,producer發送message時在message中帶有transactionID。 broker收到message後判斷是否有transactionID,如果有就把message保存在transaction store中, 等待commit或者rollback消息。 2)、不帶事務的session 不帶事務的session的簽收方式,取決於session的配置。 Activemq支持一下三種模式: Session.AUTO_ACKNOWLEDGE 消息自動簽收 Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重復發送。在第二次重新傳送消息的時候,消息 頭的JmsDelivered會被置為true標示當前消息已經傳送過一次,客戶端需要進行消息的重復處理控制。 代碼示例如下: session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); textMsg.acknowledge(); */ Topic topic = session.createTopic("wm5920.topic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//設置非持久化 //producer.setTimeToLive(5000);//5秒後過期,這個對點對點模式有效 TextMessage message = session.createTextMessage(); message.setText("message_" + System.currentTimeMillis()); producer.send(message); System.out.println("Sent message: " + message.getText()); //帶有事務得commit //session.commit(); session.close(); connection.stop(); connection.close(); } }
訂閱主題,註:如果在發布主題前,沒有訂閱,是收不到消息的,這跟點對點的隊列模式不同
package com.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicSubs{ public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.setClientID("wm5920"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("wm5920.topic"); //持久訂閱方式,不會漏掉信息 TopicSubscriber subs=session.createDurableSubscriber(topic, "wm5920"); subs.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //非持久訂閱方式 // MessageConsumer consumer = session.createConsumer(topic); // consumer.setMessageListener(new MessageListener() { // public void onMessage(Message message) { // TextMessage tm = (TextMessage) message; // try { // System.out.println("Received message: " + tm.getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // }); // session.commit(); // session.close(); // connection.stop(); // connection.close(); } }
1.ActiveMQ的幾種消息持久化機制
為了避免意外宕機以後丟失信息,需要做到重啟後可以恢復消息隊列,消息系統一般都會采用持久化機制。
ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,無論使用哪種持久化方式,消息的存儲邏輯都是一致的。
就是在發送者將消息發送出去後,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,然後試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。
消息中心啟動以後首先要檢查指定的存儲位置,如果有未發送成功的消息,則需要把消息發送出去。
>> JDBC持久化方式
使用JDBC持久化方式,數據庫會創建3個表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中。
(1)配置方式
配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml文件,
首先定義一個mysql-ds的MySQL數據源,然後在persistenceAdapter節點中配置jdbcPersistenceAdapter並且引用剛才定義的數據源。
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> </persistenceAdapter>
dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啟動的時候創建數據表,默認值是true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為true,之後改成false。
使用MySQL配置JDBC持久化:
<beans> <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
(2)數據庫表信息
activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中:
ID:自增的數據庫主鍵
CONTAINER:消息的Destination
MSGID_PROD:消息發送者客戶端的主鍵
MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID
EXPIRATION:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數
MSG:消息本體的Java序列化對象的二進制數據
PRIORITY:優先級,從0-9,數值越大優先級越高
activemq_acks用於存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存:
主要的數據庫字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,這個字段會有集群其他系統的信息
CLIENT_ID:每個訂閱者都必須有一個唯一的客戶端ID用以區分
SUB_NAME:訂閱者名稱
SELECTOR:選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支持多屬性AND和OR操作
LAST_ACKED_ID:記錄消費過的消息的ID。
表activemq_lock在集群環境中才有用,只有一個Broker可以獲得消息,稱為Master Broker,
其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。
這個表用於記錄哪個Broker是當前的Master Broker。
>> AMQ方式
性能高於JDBC,寫入消息時,會將消息寫入日誌文件,由於是順序追加寫,性能很高。為了提升性能,創建消息主鍵索引,並且提供緩存機制,進一步提升性能。每個日誌文件的大小都是有限制的(默認32m,可自行配置)。
當超過這個大小,系統會重新建立一個文件。當所有的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。
主要的缺點是AMQ Message會為每一個Destination創建一個索引,如果使用了大量的Queue,索引文件的大小會占用很多磁盤空間。
而且由於索引巨大,一旦Broker崩潰,重建索引的速度會非常慢。
配置片段如下:
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/> </persistenceAdapter>
雖然AMQ性能略高於下面的Kaha DB方式,但是由於其重建索引時間過長,而且索引文件占用磁盤空間過大,所以已經不推薦使用。
>> KahaDB方式
KahaDB是從ActiveMQ 5.4開始默認的持久化插件,也是我們項目現在使用的持久化方式。
KahaDb恢復時間遠遠小於其前身AMQ並且使用更少的數據文件,所以可以完全代替AMQ。
kahaDB的持久化機制同樣是基於日誌文件,索引和緩存。
配置方式:
<persistenceAdapter> <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/> </persistenceAdapter>
directory : 指定持久化消息的存儲目錄
journalMaxFileLength : 指定保存消息的日誌文件大小,具體根據你的實際應用配置
(1)KahaDB主要特性
1、日誌形式存儲消息;
2、消息索引以B-Tree結構存儲,可以快速更新;
3、完全支持JMS事務;
4、支持多種恢復機制;
(2)KahaDB的結構
消息存儲在基於文件的數據日誌中。如果消息發送成功,變標記為可刪除的。系統會周期性的清除或者歸檔日誌文件。
消息文件的位置索引存儲在內存中,這樣能快速定位到。定期將內存中的消息索引保存到metadata store中,避免大量消息未發送時,消息索引占用過多內存空間。
Data logs:
Data logs用於存儲消息日誌,消息的全部內容都在Data logs中。
同AMQ一樣,一個Data logs文件大小超過規定的最大值,會新建一個文件。同樣是文件尾部追加,寫入性能很快。
每個消息在Data logs中有計數引用,所以當一個文件裏所有的消息都不需要了,系統會自動刪除文件或放入歸檔文件夾。
Metadata cache :
緩存用於存放在線消費者的消息。如果消費者已經快速的消費完成,那麽這些消息就不需要再寫入磁盤了。
Btree索引會根據MessageID創建索引,用於快速的查找消息。這個索引同樣維護持久化訂閱者與Destination的關系,以及每個消費者消費消息的指針。
Metadata store
在db.data文件中保存消息日誌中消息的元數據,也是以B-Tree結構存儲的,定時從Metadata cache更新數據。Metadata store中也會備份一些在消息日誌中存在的信息,這樣可以讓Broker實例快速啟動。
即便metadata store文件被破壞或者誤刪除了。broker可以讀取Data logs恢復過來,只是速度會相對較慢些。
>>LevelDB方式
從ActiveMQ 5.6版本之後,又推出了LevelDB的持久化引擎。
目前默認的持久化方式仍然是KahaDB,不過LevelDB持久化性能高於KahaDB,可能是以後的趨勢。
在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的數據復制方式,用於Master-slave方式的首選數據復制方案。
ActiveMQ的設置消息時長,事務,確認機制 ,持久化