1. 程式人生 > >ActiveMQ學習筆記(15)----Message Dispatch高階特性(一)

ActiveMQ學習筆記(15)----Message Dispatch高階特性(一)

1. Message Cursors

  1.1 概述

  ActiveMQ傳送持久化訊息的典型的厝裡方式是:當訊息的消費者準備就緒時,訊息傳送系統把儲存的訊息按批次傳送給消費者,在傳送完一個批次的訊息後,指標的標記位置指向下一個批次的待發訊息的位置,進行後續的傳送操作。這是一種 比較健壯和靈活的訊息傳送方式,但是大多數的情況下,訊息的消費者不一定一直都處於這種理想的活躍狀態。

  因此,從ActiveMQ5.0.0版本開始,訊息傳送系統採用一種混合型的傳送模式,當訊息消費者處於活躍狀態時,允許訊息傳送系統直接把持久化訊息傳送給消費者,當消費者處於不活躍狀態下時,切換使用Cursors來處理訊息的傳送。

  當訊息的消費者處於活躍狀態並且處理能力較強時,被持久化儲存的訊息直接傳送到與消費者相關聯的傳送佇列,如下:

  

  當訊息已經出現積壓,消費者再開始活躍,或者是消費者的消費速度比訊息的傳送速度慢時,訊息將從Pending Cursor中提取,併發送與消費者關聯的傳送佇列。見下圖:

  

  1.) Store-base

    從ActiveMQ5.0 開始,預設使用此種類型的cursor,其能夠滿足大多數場景的使用要求。同時支援非持久訊息的處理,Store-based內嵌了File-based的模式,非持久訊息直接被Non-persistent pending Cursor所處理。工作模式見下圖:

  

  2). VM

  相關訊息引用儲存在記憶體中,當滿足條件時,訊息直接被髮送到消費者與值相關的傳送佇列,處理速度非常快,但出現慢消費或者消費者長時間處於不活躍狀態的情況下,無法適應。工作模式見下圖:

  

  3) .File-based

   當記憶體設定達到設定的限制,訊息被儲存到磁碟中的臨時檔案中。工作模式見下圖:

  

  4). 配置使用

  在預設的情況下,ActiveMQ會根據使用的Message Store來決定使用何種型別的Message Cursors,但是可以根據destination來配置Message Cursors,例如:

  1. 對Topic subscripbers

 <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic="org.apache" producerFlowControl="false" memoryLimit="1mb">
                    <dispatchPolicy>
                        <<strictOrderDispatchPolicy/>
                    </dispatchPolicy>
                    <deadLetterStrategy>
                        <individualDealLetterStrategy  topicPrefix="Test.DLQ"/>
                    </deadLetterStrategy>
                    <pendingSubscriberPolicy>
                        <vmCursor/>
                    </pendingSubscriberPolicy>
                    <pendingDurableSubscriberPolicy>
                        <vmDurableCursor/>
                    </pendingDurableSubscriberPolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

  配置說明:

  有效的Subscriber型別是vmCursor和fileCursor,預設是store based cursor。有效的持久化subscribe的cursor types是storeDurableSubscriberCursor,vmDurableCursor和fileDurableSubscriberCursor,預設是store based cursor.

  2. 對於queue的配置

<destinationPolicy>
            <policyMap>
              <policyEntries>
        
                <policyEntry queue="org.apache.>" >
                    <deadLetterStrategy>
                        <individualDealLetterStrategy  topicPrefix="Test.DLQ"/>
                    </deadLetterStrategy>
                    <pendingQueuePolicy>
                        <vmQueueCursor/>
                    </pendingQueuePolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

2. Async Sends

  ActiveMQ支援非同步和同步傳送訊息,是可以配置,通常對於快的消費者,是直接把訊息同步傳送過去,但是對於一個慢消費者,你使用同步傳送訊息可能出現producer堵塞等現象,慢消費者適合非同步傳送。

  配置使用:

  1. ActiveMQ預設設定dispatchAsync=true是最好的效能設定。如果你處理Fast Consumer則使用dispatchAsync=false

  2. 在Connection URI級別來配置使用Async Send

    factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.useAsyncSend=true");

  3. 在ConnectionFactory級別來配置使用Async Send

    factory.setUseAsyncSend(true);(此方法是存在於ActiveMQConnectionFactory中的)

  4. Connection級別來配置使用Async Send

    connection.setUseAsyncSend(true); (此方法存在於ActiveMQConnection中)