ActiveMQ的訊息儲存持久化(一)
阿新 • • 發佈:2019-02-10
ActiveMQ的訊息儲存持久化(一)
概述
ActiveMQ不僅支援persistent和non-persistent兩種方式,還支援訊息的恢復(recovery)方式。
PTP
Queue的儲存是很簡單的,就是一個FIFO的Queue。
PUB/SUB
對於持久化訂閱主題,每一個消費者將獲得一個訊息的複製。
有效的訊息儲存
ActiveMQ提供了一個外掛式的訊息儲存,類似於訊息的多點傳播,主要實現瞭如下幾種:
- AMQ訊息儲存,基於檔案的儲存方式,是以前的預設訊息儲存。
- KahaDB訊息儲存,提供了容量的提升和恢復能力,是現在的預設儲存方式。
- JDBC訊息儲存,訊息基於JDBC儲存的。
- Memory 訊息儲存,基於記憶體的訊息儲存。
KahaDB Message Store概述
KahaDB是目前預設的儲存方式,可用於任何場景,提高了效能和恢復能力。訊息儲存使用一個事務日誌和僅僅用一個索引檔案來儲存它所有的地址。
KahaDB是一個專門針對訊息持久化的解決方案,它對典型的訊息使用模式進行了優化。在KahaDB中,資料被追加到data logs中。當不再需要log檔案中的資料的時候,log檔案會被丟棄。
KahaDB基本配置示例
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
可用的屬性有:
- director:KahaDB存放的路徑,預設值activemq-data。
- indexWriteBatchSize: 批量寫入磁碟的索引page數量,預設值1000。
- indexCacheSize:記憶體中快取索引page的數量,預設值10000。
- enableIndexWriteAsync:是否非同步寫出索引,預設false。
- journalMaxFileLength:設定每個訊息data log的大小,預設是32MB。
- enableJournalDiskSyncs:設定是否保證每個沒有事務的內容,被同步寫入磁碟,JMS持久化的時候需要,預設為true。
- cleanupInterval:在檢查到不再使用的訊息後,在具體刪除訊息前的時間,預設30000。
- checkpointInterval:checkpoint的間隔時間,預設5000。
- ignoreMissingJournalfiles:是否忽略丟失的訊息日誌檔案,預設false。
- checkForCorruptJournalFiles:在啟動的時候,將會驗證訊息檔案是否損壞,預設false。
- checksumJournalFiles:是否為每個訊息日誌檔案提供checksum,預設false。
- archiveDataLogs: 是否移動檔案到特定的路徑,而不是刪除它們,預設false。
- directoryArchive:定義訊息已經被消費過後,移動data log到的路徑,預設null。
- databaseLockedWaitDelay:獲得資料庫鎖的等待時間 (used by shared master/slave),預設10000。
- maxAsyncJobs:設定最大的可以儲存的非同步訊息佇列,預設值10000,可以和concurrentMessageProducers 設定成一樣的值。
- concurrentStoreAndDispatchTransactions:是否分發訊息到客戶端,同時事務儲存訊息,預設true。
- concurrentStoreAndDispatchTopics:是否分發Topic訊息到客戶端,同時進行儲存,預設true。
- concurrentStoreAndDispatchQueues:是否分發queue訊息到客戶端,同時進行儲存,預設true。
在Java中內嵌使用Broker,使用KahaDB的示例:
public class EmbeddedBrokerUsingKahaDBStoreExample {
BrokerService createEmbeddedBroker() throws Exception {
BrokerService broker = new BrokerService();
File dataFileDir = new File("target/amq-in-action/kahadb");
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
// Using a bigger journal file
kaha.setJournalMaxFileLength(1024*100);
// small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100);
// do the index write in a separate thread
kaha.setEnableIndexWriteAsync(true);
broker.setPersistenceAdapter(kaha);
//create a transport connector
broker.addConnector("tcp://localhost:61616");
//start the broker
broker.start();
return broker;
}
}