1. 程式人生 > >架構師日記——ActiveMQ使用場景和優化建議

架構師日記——ActiveMQ使用場景和優化建議

ActiveMQ主要有以下幾種使用場景

  • 非同步呼叫
  • 一對多通訊
  • 做多個系統的整合,同構、異構
  • 作為RPC的替代
  • 多個應用相互解耦
  • 作為事件驅動架構的幕後支撐
  • 為了提高系統的可伸縮性

ActiveMQ優化可以從以下幾個方面

  • ActiveMQ的效能依賴於很多因素,比如:
    1:網路拓撲結構,比如:嵌入、主從複製、網路連線
    2:transport協議
    3:service的質量,比如topic還是queue,是否持久化,是否需要重新投遞,訊息超時等
    4:硬體、網路、JVM和作業系統等
    5:生產者的數量,消費者的數量
    6:訊息分發要經過的destination數量,以及訊息的大小等
  • 非持久化訊息比持久化訊息更快,原因如下:
    1:非持久化傳送訊息是非同步的,Producer不需要等待Consumer的receipt訊息
    2:而持久化是要把訊息先儲存起來,然後再傳遞
  • 儘量使用非同步投遞訊息,示例如:
cf.setUseAsyncSend(true);
  • TransactionNon-transaction更快
  • 可以考慮內嵌啟動broker,這樣應用和Broker之間可以使用VM協議通訊,速度快
  • 儘量使用基於檔案的訊息儲存方案,比如使用KahaDB的方式
  • 調整Prefetch Limit,ActiveMQ預設的prefetch大小不同的:
    1:Queue Consumer
    預設1000
    2:Queue Browser Consumer預設500
    3:Persistent Topic Consumer預設1000
    4:Non-persistent Topic Consumer預設32767
    Prefecth policy設定示例如下:
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Properties props = new Properties();
props.setProperty("prefetchPolicy.queuePrefetch", "1000");
props.setProperty
("prefetchPolicy.queueBrowserPrefetch", "500"); props.setProperty("prefetchPolicy.durableTopicPrefetch", "1000"); props.setProperty("prefetchPolicy.topicPrefetch", "32767"); cf.setProperties(props);

也可以在建立Destination的時候設定prefetch size,示例如下:

Queue queue = new ActiveMQQueue("TEST.QUEUE-consumer.prefetchSize=10");
MessageConsumer consumer = session.createConsumer(queue);
  • 可以考慮生產者流量控制,可以通過xml配置,程式碼開啟方式如下:
cf.setProducerWindowSize(1024000);
  • 可以考慮關閉訊息的複製功能,也能部分提高心能,在連線工廠上設定,如下:
cf.setCopyMessageOnSend(false);
  • 調整TCP協議
    TCP協議是ActiveMQ中最常使用的協議,常見有如下配置會影響協議效能:
    1:socketBufferSize:socket的快取大小,預設是65536
    2:tcpNoDelay:預設是false
    示例如:
String url = "failover://(tcp://localhost:61616-tcpNoDelay=true)";
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
  • 訊息投遞和訊息確認
    官方建議使用自動確認的模式,同時還可以開啟優化確認的選項,如下:
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setOptimizeAcknowledge(true);
  • 在消費者這邊,session會在一個單獨的執行緒中分發訊息給消費者,如果你使用的自動確認模
    式,為了增加吞吐量,你可以直接通過session傳遞訊息給消費者,示例如下:
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setAlwaysSessionAsync(false);
  • 如果使用KahaDB進行訊息儲存的話,可以調整如下選項來優化效能:
    1:indexCacheSize:預設為10000,用來設定快取頁的個數,預設情況一頁是4KB,一般來說快取的大小盡可能的設定大一些,以避免記憶體不足時頻繁的交換。
    2:indexWriteBatchSize:預設1000,用來設定髒索引(髒索引就是cache中的index和訊息儲存中的index狀態不一樣)達到多少之後,就需要把索引儲存起來。如果你想最大化broker的速度,那麼就把這個值設定的儘可能的大一些,這樣的話,僅會在到達checkpoint的時候,索引才會被儲存起來。但是這樣會增大系統出錯的時候,丟失大量的元資料的風險。
    3:journalMaxFileLength:預設32mb,當broker的吞吐量特別大的時候,日誌檔案會很快被寫滿,這樣會因為頻繁的關閉檔案,開啟檔案而導致效能低下。你可以通過調整檔案的size,減少檔案切換的頻率,從而獲得輕微的效能改善。
    4:enableJournalDiskSyncs:預設為true,通常,broker會在給producer確認之前,把訊息同步到磁碟上(並且確保訊息物化到磁碟上)。你可以通過設定這個選項為false,從而獲得本質的效能改善。但是這樣的話,多少會降低broker的可靠性。