JMS學習七(ActiveMQ訊息持久化)
ActiveMQ的訊息持久化機制有JDBC,AMQ,KahaDB和LevelDB,還有一種記憶體儲存的方式,由於記憶體不屬於持久化範疇,而且如果使用記憶體佇列,可以考慮使用更合適的產品,如ZeroMQ。所以記憶體儲存不在討論範圍內。
無論使用哪種持久化方式,訊息的儲存邏輯都是一致的。
訊息分為Queue和Topic兩種,Queue是點對點消費,傳送者傳送一條訊息,只有一個且唯一的一個消費者能對其進行消費。
Topic是訂閱式消費,一個訊息可以被很多的訂閱者消費,其中定閱者又分為持久化訂閱和非持久化訂閱。持久化訂閱是指即使訂閱者當前不線上,其訂閱之後,傳送方發到Broker的訊息,也會在持久化訂閱者再次上線的時候完成消費,不會丟失訊息。而非持久化訂閱者,只有訂閱者線上時才會消費,不線上時,即使Broker收到新的訊息,當其再次上線時,也不會收到錯過的訊息。
ActiveMQ的持久化機制,對於Queue型別的訊息,將儲存在Broker,但是一旦其中一個消費者完成消費,則立即刪除這條訊息。對於Topic型別的訊息,即使所有的訂閱者都完成了消費,Broker也不一定會馬上刪除無用訊息,而是保留推送歷史,之後會非同步清除無用訊息。而每個訂閱者消費到了哪條訊息的offset會記錄在Broker,以免下次重複消費。因為訊息是順序消費,先進先出,所以只需要記錄上次訊息消費到哪裡就可以了。
配置持久化的方式,都是修改%ACTIVEMQ_HOME%conf/acticvemq.xml檔案。
下面分別介紹幾種持久化方式的特點:
JDBC:很多企業級應用比較喜歡這種儲存方式。優點是大多數企業都有專門的DBA,以資料庫作為儲存介質,會讓有這方面人才的公司比較放心。另外,資料庫的儲存方式,可以看到訊息是如何儲存的,可以通過SQL查詢訊息消費狀態,可以檢視訊息內容,這是其他持久化方式所不具備的。還有一個優點就是資料庫可以支援強一致性事務,支援兩階段提交的分散式事務。缺點是效能問題,資料庫持久化是效能最低的一種方式。
之所以最先介紹資料庫的持久化方式,是因為我們可以通過表結構很好的理解ActiveMQ是怎麼儲存和消費訊息的。
資料庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用於儲存訊息,Queue和Topic都儲存在這個表中。
下面介紹一下主要的資料庫欄位:
ID:自增的資料庫主鍵
CONTAINER:訊息的Destination
MSGID_PROD:訊息傳送者客戶端的主鍵
MSG_SEQ:是傳送訊息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID
EXPIRATION:訊息的過期時間,儲存的是從1970-01-01到現在的毫秒數
MSG:訊息本體的Java序列化物件的二進位制資料
PRIORITY:優先順序,從0-9,數值越大優先順序越高
activemq_acks用於儲存訂閱關係。如果是持久化Topic,訂閱者和伺服器的訂閱關係在這個表儲存。
主要的資料庫欄位如下:
CONTAINER:訊息的Destination
SUB_DEST:如果是使用Static叢集,這個欄位會有叢集其他系統的資訊
CLIENT_ID:每個訂閱者都必須有一個唯一的客戶端ID用以區分
SUB_NAME:訂閱者名稱
SELECTOR:選擇器,可以選擇只消費滿足條件的訊息。條件可以用自定義屬性實現,可支援多屬性AND和OR操作
LAST_ACKED_ID:記錄消費過的訊息的ID。
表activemq_lock在叢集環境中才有用,只有一個Broker可以獲得訊息,稱為Master Broker,其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。這個表用於記錄哪個Broker是當前的Master Broker。
配置如下:
<beans>
<broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
首先定義一個mysql-ds的MySQL資料來源,然後在persistenceAdapter節點中配置jdbcPersistenceAdapter並且引用剛才定義的資料來源。
AMQ:效能高於JDBC,寫入訊息時,會將訊息寫入日誌檔案,由於是順序追加寫,效能很高。為了提升效能,建立訊息主鍵索引,並且提供快取機制,進一步提升效能。每個日誌檔案的大小都是有限制的(預設32m,可自行配置)。當超過這個大小,系統會重新建立一個檔案。當所有的訊息都消費完成,系統會刪除這個檔案或者歸檔(取決於配置)。主要的缺點是AMQ Message會為每一個Destination建立一個索引,如果使用了大量的Queue,索引檔案的大小會佔用很多磁碟空間。而且由於索引巨大,一旦Broker崩潰,重建索引的速度會非常慢。
配置片段如下:
<persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>
雖然AMQ效能略高於Kaha DB,但是由於其重建索引時間過長,而且索引檔案佔用磁碟空間過大,所以已經不推薦使用。這裡就不在詳細介紹AMQ持久化的資料結構。在新版本的ActiveMQ中,AMQ已經被刪除。
KahaDB:從ActiveMQ 5.4開始預設的持久化外掛,KahaDb恢復時間遠遠小於其前身AMQ並且使用更少的資料檔案,所以可以完全代替AMQ。kahaDB的持久化機制和AMQ非常像。同樣是基於日誌檔案,索引和快取。和AMQ不同,KahaDB所有的Destination都使用一個索引檔案。《ActiveMQ In Action》表示其可以支援10000個連線,每個連線都是一個獨立的Queue,足以滿足大部分應用場景。
Data logs用於儲存訊息日誌,訊息的全部內容都在Data logs中。同AMQ一樣,一個Data logs檔案大小超過規定的最大值,會新建一個檔案。同樣是檔案尾部追加,寫入效能很快。每個訊息在Data logs中有計數引用,所以當一個檔案裡所有的訊息都不需要了,系統會自動刪除檔案或放入歸檔資料夾。
快取用於存放線上消費者的訊息。如果消費者已經快速的消費完成,那麼這些訊息就不需要再寫入磁碟了。
Btree索引會根據MessageID建立索引,用於快速的查詢訊息。這個索引同樣維護持久化訂閱者與Destination的關係,以及每個消費者消費訊息的指標。
Redo log用於系統崩潰後,重建Btree索引。因為Redo log的存在,使得重建索引時不需要讀取Data logs的全量資料,大大提升效能。
KahaDB的目錄結構:
db log檔案,以db-<Number>.log命名。archive目錄用於存檔歸檔的資料。db.data和db.redo分別是Btree索引和redo log。
由於是ActiveMQ的預設持久化機制,所以不需要修改配置檔案就可以使用KahaDB,但是還是貼出配置片段:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>
LevelDB:從ActiveMQ 5.6版本之後,又推出了LevelDB的持久化引擎。LevelDB持久化效能高於KahaDB,雖然目前預設的持久化方式仍然是KahaDB,但是LevelDB是將來的趨勢。並且,在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的資料複製方式,用於Master-slave方式的首選資料複製方案。LevelDB使用自定義的索引代替常用的BTree索引。
通過上圖可以看出LevelDB主要由6部分組成:記憶體中的MemTable和ImmutableMemTable,還有硬碟上的log檔案,manifest檔案,current檔案和SSTable檔案。還有一些其他的輔助檔案,暫時不做說明。
每寫入一次資料,需要寫入log檔案,和MemTable,也就是說,只需要一次硬碟的順序寫入,和一個記憶體寫入,如果系統崩潰,可以通過log檔案恢復資料。每次寫入會先寫log檔案,後寫MemTable來保證不丟失資料。
當MemTable到達記憶體閥值,LevelDB會建立一個新的MemTable和log檔案,而舊的MemTable會變成ImmutableMemTable,ImmutableMemTable的內容是隻讀的。然後系統會定時的非同步的把ImmutableMemTable的資料寫入新的SSTable檔案。
SSTable檔案和MemTable,ImmutableMemTable的資料結構相同,都是key,value的資料,按照key排序。
manifest檔案用於記錄每個SSTable的key的起始值和結束值,有點類似於B-tree索引。而manifest同樣會生成新檔案,舊的檔案不再使用。current檔案就是指定哪個manifest檔案是現在正在使用的。
配置片段如下:
<persistenceAdapter>
<levelDB directory="${activemq.data}/activemq-data"/>
</persistenceAdapter>
在目前的ActiveMQ 5.10版本中,直接使用LevelDB會導致服務不能啟動,丟擲java.io.IOException: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
原因是有兩個Guava cache導致版本衝突,解決的辦法是:
-
刪除%ACTIVEMQ_HOME%lib下面的pax-url-aether-1.5.2.jar
-
註釋掉%ACTIVEMQ_HOME%conf/activemq.xml的下面幾行:
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
下面是跑在我機器上的效能測試,實際資料意義不大,因為每個環境的配置都不同,但是可以通過對比看出幾種持久化方式的效能對比。
傳送1000條訊息(毫秒) | 傳送10000條訊息(毫秒) | 消費1000條訊息的時間(毫秒) | 消費10000條訊息的時間(毫秒) | |
JDBC-Mysql | 43009 | 369802 | 610 | 509338 |
KahaDB | 34227 | 360493 | 208 | 2224 |
LevelDB | 34032 | 347712 | 220 | 2877 |
通過這個表格可以看出來,傳送訊息LevelDB最快,KahaDB稍微慢點,JDBC最慢,但是也不會慢太多,是一個數量級。消費訊息,KahaDB最快,LevelDB稍微慢點,JDBC慢的讓人不能忍受,差好幾個數量級。LevelDB並沒有顯現出比KahaDB更多速度上的優勢。但是由於LevelDB支援高可用的複製資料,所以首選肯定還是LevelDB。
上面對幾種持久化方案講解的很詳細下面在看看著另一種
JDBC Message Store with ActiveMQ Journal
1、這種方式客服了jdbc store 的不足,使用快速的快取寫入技術,大大提高了效能,具體配置如下:
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="2" journalLogFileSize="16" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="${activemq.data}/data"/>
</persistenceFactory>
其他的和jdbc store 是一樣的。
優點: 比jdbdc store 寫入速度快
缺點:不用用於master/slave 模式
注意:如果使用資料庫持久化方案則 記得在activemq的lib資料夾下新增相關資料庫的驅動!
原文地址:https://my.oschina.net/u/719192/blog/287434