ActiveMQ學習筆記(15)----Message Dispatch高階特性(一)
1. Message Cursors
1.1 概述
ActiveMQ傳送持久化訊息的典型的厝裡方式是:當訊息的消費者準備就緒時,訊息傳送系統把儲存的訊息按批次傳送給消費者,在傳送完一個批次的訊息後,指標的標記位置指向下一個批次的待發訊息的位置,進行後續的傳送操作。這是一種 比較健壯和靈活的訊息傳送方式,但是大多數的情況下,訊息的消費者不一定一直都處於這種理想的活躍狀態。
因此,從ActiveMQ5.0.0版本開始,訊息傳送系統採用一種混合型的傳送模式,當訊息消費者處於活躍狀態時,允許訊息傳送系統直接把持久化訊息傳送給消費者,當消費者處於不活躍狀態下時,切換使用Cursors來處理訊息的傳送。
當訊息的消費者處於活躍狀態並且處理能力較強時,被持久化儲存的訊息直接傳送到與消費者相關聯的傳送佇列,如下:
當訊息已經出現積壓,消費者再開始活躍,或者是消費者的消費速度比訊息的傳送速度慢時,訊息將從Pending Cursor中提取,併發送與消費者關聯的傳送佇列。見下圖:
1.) Store-base
從ActiveMQ5.0 開始,預設使用此種類型的cursor,其能夠滿足大多數場景的使用要求。同時支援非持久訊息的處理,Store-based內嵌了File-based的模式,非持久訊息直接被Non-persistent pending Cursor所處理。工作模式見下圖:
2). VM
相關訊息引用儲存在記憶體中,當滿足條件時,訊息直接被髮送到消費者與值相關的傳送佇列,處理速度非常快,但出現慢消費或者消費者長時間處於不活躍狀態的情況下,無法適應。工作模式見下圖:
3) .File-based
當記憶體設定達到設定的限制,訊息被儲存到磁碟中的臨時檔案中。工作模式見下圖:
4). 配置使用
在預設的情況下,ActiveMQ會根據使用的Message Store來決定使用何種型別的Message Cursors,但是可以根據destination來配置Message Cursors,例如:
1. 對Topic subscripbers
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="org.apache" producerFlowControl="false" memoryLimit="1mb"> <dispatchPolicy> <<strictOrderDispatchPolicy/> </dispatchPolicy> <deadLetterStrategy> <individualDealLetterStrategy topicPrefix="Test.DLQ"/> </deadLetterStrategy> <pendingSubscriberPolicy> <vmCursor/> </pendingSubscriberPolicy> <pendingDurableSubscriberPolicy> <vmDurableCursor/> </pendingDurableSubscriberPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
配置說明:
有效的Subscriber型別是vmCursor和fileCursor,預設是store based cursor。有效的持久化subscribe的cursor types是storeDurableSubscriberCursor,vmDurableCursor和fileDurableSubscriberCursor,預設是store based cursor.
2. 對於queue的配置
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue="org.apache.>" > <deadLetterStrategy> <individualDealLetterStrategy topicPrefix="Test.DLQ"/> </deadLetterStrategy> <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
2. Async Sends
ActiveMQ支援非同步和同步傳送訊息,是可以配置,通常對於快的消費者,是直接把訊息同步傳送過去,但是對於一個慢消費者,你使用同步傳送訊息可能出現producer堵塞等現象,慢消費者適合非同步傳送。
配置使用:
1. ActiveMQ預設設定dispatchAsync=true是最好的效能設定。如果你處理Fast Consumer則使用dispatchAsync=false
2. 在Connection URI級別來配置使用Async Send
factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.useAsyncSend=true");
3. 在ConnectionFactory級別來配置使用Async Send
factory.setUseAsyncSend(true);(此方法是存在於ActiveMQConnectionFactory中的)
4. Connection級別來配置使用Async Send
connection.setUseAsyncSend(true); (此方法存在於ActiveMQConnection中)