1. 程式人生 > >【ActiveMQ】五 基本元素高階特性

【ActiveMQ】五 基本元素高階特性

Destination 高階特性

一 組合佇列

組合佇列 Composite Destinations 組合佇列允許用一個虛擬的destination代表多個destinations。這樣就可以通過composite destinations在一個操作中同時向多個queue傳送訊息。

1.1 程式碼實現

客戶端實現的方式 composite destinations中,多個destination之間採用“,”分割。例如:

Queue queue= new ActiveMQQueue("FOO.A,FOO.B,FOO.C");

如果你希望使用不同型別的destination

,那麼需要加上字首如queue:// topic://,例如: Queue queue= new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

Destination destination = session.createQueue("my-queue,my-queue2");

1.2 xml配置

<destinationInterceptors>

<virtualDestinationInterceptor>

<virtualDestinations>

<compositeQueuename="MY.QUEUE">

<forwardTo>

<queue physicalName="my-queue" />

<queue physicalName="my-queue2" />

                                                       </forwardTo>

                                    </compositeQueue>

</virtualDestinations>

</virtualDestinationInterceptor>

 </destinationInterceptors>

 

二 佇列失效

一般情況下,ActiveMQqueue在不使用之後,可以通過web控制檯或是JMX方式來刪除掉。當然,也可以通過配置,使得broker可以自動探測到無用的佇列(一定時間內為空的佇列)並刪除掉,回 收響應資源。可以如下配置:

<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000">

   <destinationPolicy>    

<policyMap>       

<policyEntries>       

                             <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>     

                                          </policyEntries>   

             </policyMap>

</destinationPolicy>   

</broker>

說明:

schedulePeriodForDestinationPurge:設定多長時間檢查一次,這裡是10秒,預設為0 inactiveTimoutBeforeGC:設定當Destination為空後,多長時間被刪除,這裡是30秒,預設為60

gcInactiveDestinations 設定刪除掉不活動佇列,預設為false

 

 

三 虛擬目的地

3.1 為何使用虛擬主題

虛擬Destinations用來建立邏輯Destinations,客戶端可以通過它來生產和消費訊息,它會把訊息對映到物理DestinationsActiveMQ支援兩種方式:

1:虛擬主題(Virtual Topics

2:組合 DestinationsComposite Destinations

ActiveMQ中,topic只有在持久訂閱下才是持久化的。持久訂閱時,每個持久訂閱者,都相當於一個queue的客戶端,它會收取所有訊息。這種情況下存在兩個問題:

1:同一應用內consumer端負載均衡的問題:也即是同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔訊息處理功能。因為每個consumer都會獲取所有訊息。 queue模式可以解決這個問題,但broker端又不能將訊息傳送到多個應用端。所以, 既要釋出訂閱,又要讓消費者分組,這個功能JMS規範本身是沒有的。

2:同一應用內consumerfailover的問題:由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理訊息了,系統的健壯性不高,為了解決這兩個問題,ActiveMQ中實現了虛擬Topic的功能

3.2 如何使用虛擬主題

1:對於訊息釋出者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如 VirtualTopic.Orders,程式碼示例如下:

 Topic destination = session.createTopic("VirtualTopic.Orders");

2:對於訊息接收端來說,是個佇列,不同應用裡使用不同的字首作為佇列的名稱,即可表明 自己的身份即可實現消費端應用分組。 例如Consumer.A.VirtualTopic.Orders,說明它是名稱為A的消費端,同理 Consumer.B.VirtualTopic.Orders說明是一個名稱為B的客戶端。可以在同一個應用裡使用多個consumer消費此queue,則可以實現上面兩個功能。又因為不同應用使用的queue名稱不同(字首不同),所以不同的應用中都可以接收到全部的訊息。每個客戶端相當於一個持久訂閱者,而且這個客戶端可以使用多個消費者 共同來承擔消費任務。

Destination destination= session.createQueue("Consumer.A.VirtualTopic.Orders");

 

 

 

3.3 設定虛擬地址字首

預設虛擬主題的字首是:VirtualTopic.> 自定義消費虛擬地址預設格式:Consumer.*.VirtualTopic.> 自定義消費虛擬地址可以改,比如下面的配置就把它修改了。 xml配置示例如下:

<broker xmlns="http://activemq.apache.org/schema/core">

<destinationInterceptors>   

         <virtualDestinationInterceptor>   

              <virtualDestinations>     

<virtualTopicname=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>     

</virtualDestinations>     

</virtualDestinationInterceptor> 

</destinationInterceptors>

</broker>

 

3.3.1  producer

public class VirtualSender {

   public static void main(String[] args) throws JMSException {

         ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");

         Connection createConnection = factory.createConnection();

         Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

         Topic createTopic = createSession.createTopic("VirtualTopic.Test");

         MessageProducer createProducer = createSession.createProducer(createTopic);

           createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

           createConnection.start();

           for(int i=0;i<3;i++){

                 TextMessage createTextMessage = createSession.createTextMessage("message"+i);

                 createProducer.send(createTextMessage);

           }

           createSession.commit();

           createSession.close();

           createConnection.close();

   }

}

3.3.2  receiver

(1) queueA

public class PersisiTopicReceiver {

     public static void main(String[] args) throws JMSException, InterruptedException {

         ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

         Connection createConnection = activeMQConnectionFactory.createConnection();

         createConnection.setClientID("B_ID");

         createConnection.start();

         final Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

         for(int i=0;i<1;i++){

             Queue destination = createSession.createQueue("Consumer.A.VirtualTopic.Test");

              MessageConsumer consumer = createSession.createConsumer(destination);

              consumer.setMessageListener(new MessageListener() {

                   public void onMessage(Message message) {

                       TextMessage msg = (TextMessage)message;

                       System.out.println(msg);

                       try {

                            Thread.sleep(2000);

                            createSession.commit();

                       } catch (InterruptedException e) {

                            // TODO Auto-generated catch block

                            e.printStackTrace();

                       } catch (JMSException e) {

                            // TODO Auto-generated catch block

                            e.printStackTrace();

                       }

                   }

              });

         }

     }

}

(2) queueB

public class PersisiTopicReceiver2 {

      public static void main(String[] args) throws JMSException, InterruptedException {

           ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");

           Connection createConnection = activeMQConnectionFactory.createConnection();

           createConnection.setClientID("B_ID2");

           createConnection.start();

           final Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

           for(int i=0;i<1;i++){

               Queue destination = createSession.createQueue("Consumer.B.VirtualTopic.Test");

                 MessageConsumer consumer = createSession.createConsumer(destination);

                 consumer.setMessageListener(new MessageListener() {

                      public void onMessage(Message message) {

                            TextMessage msg = (TextMessage)message;

                            System.out.println(msg);

                            try {

                                  createSession.commit();

                                  Thread.sleep(2000);

                            } catch (InterruptedException e) {

                                  // TODO Auto-generated catch block

                                  e.printStackTrace();

                            } catch (JMSException e) {

                                  // TODO Auto-generated catch block

                                  e.printStackTrace();

                            }

                      }

                 });

           }

      }

}

 

3.3.3  result

        如下是通過一個構建一個虛擬主題,生成多個消費佇列,實現負載均衡,

 

3.4 映象佇列

ActiveMQ中每個queue中的訊息只能被一個consumer消費。然而,有時候你可能希望 能夠監視生產者和消費者之間的訊息流。你可以通過使用Virtual Destinations 來建立一個virtual queue 來把訊息轉發到多個queues中。但是為系統中每個queue都進行如此的配置可能會很麻煩。

使用 ActiveMQ支援Mirrored QueuesBroker會把傳送到某個queue的所有訊息轉發到一 個名稱類似的topic,因此監控程式只需要訂閱這個mirrored queue topic。為了啟用 Mirrored Queues,首先要將BrokeruseMirroredQueues屬性設定成true,然後可

以通過destinationInterceptors設定其它屬性,如mirror topic的字首,預設是 VirtualTopic.Mirror.”。 比如修改後綴的配置示例如下:

 

<broker>

<destinationInterceptors>

<mirroredQueuecopyMessage="true"  postfix=".qmirror"  prefix=""/>

</destinationInterceptors>

</broker>

 

3.4.1 producer

public class MsgSendder {

      public static void main(String[] args) throws Exception {

           ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://192.168.232.128:61616,tcp://192.168.232.128:61617)?randomize=false");

           Connection connection = ConnectionFactoryconnectionFactory.createConnection();

           connection.start();

           Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

           Destination destination = session.createQueue("my-queue,my-queue2");

           MessageProducer producer = session.createProducer(destination);

                 for (int i = 0; i < 40; i++) {

                      TextMessage message = session.createTextMessage("message--" + i);

                            producer.send(message);

                 }

                 session.commit();

                 session.close();

                 connection.close();

           }

}

3.4.2 consumer

public class PersisiTopicReceiver {

     public static void main(String[] args) throws JMSException {

         ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");

         Connection createConnection = activeMQConnectionFactory.createConnection();

         createConnection.setClientID("訂閱者B_ID");

         createConnection.start();

         Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

         Topic createTopic = createSession.createTopic("my-queue.qmirror");

         TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "T1");

         TextMessage message = (TextMessage)createDurableSubscriber.receive();

         while(message!=null){

              System.out.println(message.getText());

              message = (TextMessage)createDurableSubscriber.receive();

         }

         createSession.commit();

         createSession.close();

         createConnection.close();

     }

}

3.4.3 result

 

 

 

MessageDispatch 高階特性

一 非同步傳送

         AciveMQ支援非同步和同步傳送訊息,是可以配置的。通常對於快的消費者, 是直接把訊息同步傳送過去,但對於一個Slow Consumer,你使用同步傳送訊息 可能出現Producer堵塞等現象,慢消費者適合使用非同步傳送

        

配置使用

1:ActiveMQ預設設定dispatcheAsync=true是最好的效能設定。如果你處理的是 Fast Consumer則使用dispatcheAsync=false

2:在Connection URI級別來配置使用AsyncSendcf= new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

3:在ConnectionFactory級別來配置使用AsyncSend ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

4:在Connection級別來配置使用AsyncSend ((ActiveMQConnection)connection).setUseAsyncSend(true);

二 嚴格的順序

嚴格順序分發策略(Strict Order Dispatch Policy) 通常ActiveMQ會保證topic consumer以相同的順序接收來自同一個producer的訊息,但有時候也需要保證不同的topic consumer以相同的順序接收訊息,然而,由於多執行緒和非同步處理,不同的topic consumer可能會以不同的順序接收來自不同producer的訊息。 Strict order dispatch policy 會保證每個topic consumer會以相同的順序接收消 息,代價是效能上的損失。

以下是一個配置例子

<policyEntry topic="ORDERS.>">

           <dispatchPolicy> 

                     <strictOrderDispatchPolicy/>  

         </dispatchPolicy>

</policyEntry>

對於Queue的配置為: <policyEntryqueue=">" strictOrderDispatch="false" />

三 輪詢策略

輪詢分發策略(Round Robin Dispatch Policy) ActiveMQ的prefetch預設引數是針對處理大量訊息時的高效能和高吞吐量而設定的,所以預設的prefetch引數比較大。而且預設的dispatch policies會嘗試儘可能快的填滿prefetch緩衝。 然而在有些情況下,例如只有少量的訊息而且單個訊息的處理時間比較長,那麼在預設的prefetch和dispatch policies下,這些少量的訊息總是傾向於被分發到個別的 consumer上。這樣就會因為負載的不均衡分配而導致處理時間的增加。

Round robin dispatch policy會嘗試平均分發訊息,以下是一個例子:

<policyEntrytopic="ORDERS.>">

           <dispatchPolicy>  

                       <roundRobinDispatchPolicy/>

</dispatchPolicy>

 </policyEntry>

Message 高階特性

一 基本屬性

1:Queue的訊息預設是持久化的。

2:訊息的優先順序預設是4。

3:訊息傳送時設定了時間戳。

4:訊息的過期時間預設是永不過期,過期的訊息進入DLQ,可以配置DLQ及其處理策略。

5:如果訊息時重發的,將會標記出來。

6:JMSReplyTo標識響應訊息傳送到哪個Queue。

7:JMSCorelationID標識此訊息相關聯的訊息id,可以用這個標識把多個訊息連線起來。

8:JMS同時也記錄了訊息重發的次數,預設是6次。

9:如果有一組關聯的訊息需要處理,可以分組:只需要設定訊息組的名字和這個訊息是第幾個訊息。

10:如果訊息中一個事務環境,則TXID將被設定。

11:此外ActiveMQ在伺服器端額外設定了訊息入列和出列的時間戳。

12:ActiveMQ裡訊息屬性的值,不僅可以用基本型別,還可以用List或Map型別。

二 Advisory 系統訊息

         Advisory Message是ActiveMQ自身的系統訊息地址,可以監聽該地址來獲取activemq的系統資訊。

2.1 系統訊息

1:consumers, producers 和 connections的啟動和停止

2:建立和銷燬temporary destinations

3:topics 和 queues的訊息過期

4:brokers 傳送訊息給 destinations,但是沒有consumers

5:connections 啟動和停止

2.2 開啟Advisories

 預設Advisory的功能是關閉的

<destinationPolicy>  

<policyMap>

<policyEntries>      

<policyEntrytopic=">" advisoryForConsumed="true" />

                </policyEntries>

</policyMap>

</destinationPolicy>

2.3 關閉Advisories

1:<broker advisorySupport="false">

2:也可在Java中寫:

BrokerService broker = new BrokerService();

broker.setAdvisorySupport(false);

broker.start();

3:也可以在ActiveMQConnectionFactory上設定‘watchTopicAdvisories’ 屬性

 ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(); factory.setWatchTopicAdvisories(false);

4:也可在ConnectionURl上寫: "tcp://localhost:61616?jms.watchTopicAdvisories=false"

2.4 Advisories的使用步驟

使用的方法和步驟:

1:要在配置檔案裡面開啟Advisories

2:訊息傳送端沒有變化

3:訊息接收端:

  1. 根據你要接收的資訊型別,來設定不同的topic,當然也可以使用AdvisorySupport這個類來輔助建立,比如你想要得到訊息生產者的資訊,你可以:

Topic d=session.createTopic("ActiveMQ.Advisory.Producer.Topic.MyTopic"); 

Topic d = session.createTopic("MyTopic");

Destination d2 = AdvisorySupport.getProducerAdvisoryTopic(destination); 

(2)由於這個topic預設不是持久化的,所以應該先開啟接收端,然後再發送topic資訊

(3)接收訊息的時候,接收到的訊息型別是ActiveMQMessage,所以型別轉換的時候,要轉換成 ActiveMQMessage,然後再通過getDataStructure方法來得到具體的資訊物件,如: if (message instanceof ActiveMQMessage) {

try {

                 ActiveMQMessagea Msg= (ActiveMQMessage) message;

                 ProducerIn foprod = (ProducerInfo) aMsg.getDataStructure();

System.out.println("count==="+aMsg.getProperty("producerCount"));

System.out.println("prodd==="+prod.getProducerId());

        } catch (Exception e) {

e.printStackTrace();

}

}

三 延時與定時訊息

         有時候我們不希望訊息馬上被broker投遞出去,而是想要訊息60秒以後發給消費者,或者我們想 讓訊息沒隔一定時間投遞一次,一共投遞指定的次數,類似這種需求,ActiveMQ提供了一種broker 端訊息定時排程機制。 我們只需要把幾個描述訊息定時排程方式的引數作為屬性新增到訊息,broker端的排程器就會按 照我們想要的行為去處理訊息。當然需要在xml中配置schedulerSupport屬性為true。

3.1  4個屬性

1:AMQ_SCHEDULED_DELAY :延遲投遞的時間

2:AMQ_SCHEDULED_PERIOD :重複投遞的時間間隔

3:AMQ_SCHEDULED_REPEAT:重複投遞次數

4:AMQ_SCHEDULED_CRON:Cron表示式

 

ActiveMQ也提供了一個封裝的訊息型別:org.apache.activemq.ScheduledMessage,可以使用這個類來 輔助設定,使用例子如:延遲60秒

MessageProducerproducer = session.createProducer(destination);

TextMessage message = session.createTextMessage("testmsg");

long time = 60 * 1000;

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,time);

producer.send(message);

 

3.2 例子1

延遲30秒,投遞10次,間隔10秒:

TextMessagemessage = session.createTextMessage("testmsg");

        long delay = 30 * 1000;

long period = 10 * 1000;

        int repeat = 9;

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); 

3.3 例子2

使用 CRON 表示式,每個小時傳送一次

TextMessagemessage=session.createTextMessage("testmsg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");

CRON表示式的優先順序高於另外三個引數,如果在設定了CRON的同時,也有repeat和period引數, 則會在每次CRON執行的時候,重複投遞repeat次,每次間隔為period。就是說設定是疊加的效果。例如 每小時都會發生訊息被投遞10次,延遲1秒開始,每次間隔1秒:  TextMessage message=session.createTextMessage("testmsg");

message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,1000); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,1000); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);

四 檔案流傳輸(重要)

有些時候,我們需要傳遞Blob(Binary Large OBjects)訊息,可以按照如下方式:

配置BLOB Transfer Policy,可以在傳送方的連線URI上設定,如:

"tcp://192.168.232.128:61616?jms.blobTransferPolicy.uploadUrl=http://192.168.232.128:8171/fileserver/"

 

Sending BlobMessages,有幾種方式:

1:如果你傳送到的檔案或者URL存在,比如發給共享檔案系統或者是Web server上的web應 用,那麼你可以使用如下方式:

BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com"); producer.send(message);

 

2:也可以在客戶端動態的建立檔案流,如下:

BlobMessage message = session.createBlobMessage(newFile("/foo/bar");

InputStreamin = ...;

BlobMessage message = session.createBlobMessage(in);

Receiving BlobMessages示例:

if (message instanceofBlobMessage) { 

BlobMessageblobMessage= (BlobMessage) message;

InputStreamin = blobMessage.getInputStream(); 

// process the stream...   

}

 

有時候需要在JMS provider內部進行message的轉換。從4.2版本起,ActiveMQ 提供了一個MessageTransformer 介面用於進行訊息轉換,可以在如下物件上調 用:

ActiveMQConnectionFactory

ActiveMQConnection

ActiveMQSession

ActiveMQMessageConsumer

ActiveMQMessageProducer

在訊息被髮送到JMS provider的訊息匯流排前進行轉換。通過producerTransform 方法

在訊息到達訊息匯流排後,但是在consumer接收到訊息前進行轉換。通過 consumerTransform方法

當然MessageTransformer 介面的實現,需要你自己來提供。

Consumer 高階特性

一 Client導致訊息重發

1:Client用了transactions,且在session中呼叫了rollback()

2:Client用了transactions,且在呼叫commit()之前關閉

3:Client在CLIENT_ACKNOWLEDGE的傳遞模式下,在session中呼叫了recover()

 

二 ActiveMQConnectionFactory重發

1:collisionAvoidanceFactor:設定防止衝突範圍的正負百分比,只有啟用useCollisionAvoidance引數時 才生效。也就是在延遲時間上再加一個時間波動範圍。預設值為0.15

2:maximumRedeliveries:最大重傳次數,達到最大重連次數後丟擲異常。為-1時不限制次數,為0時表示 不進行重傳。預設值為6。

3:maximumRedeliveryDelay:最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首 次重連間隔為10ms,倍數為2,那麼第二次重連時間間隔為 20ms,第三次重連時間間隔為40ms,當重連 時間間隔大的最大重連時間間隔時,以後每次重連時間間隔都為最大重連時間間隔。預設為-1。

4:initialRedeliveryDelay:初始重發延遲時間,預設1000L

5:redeliveryDelay:重發延遲時間,當initialRedeliveryDelay=0時生效,預設1000L

6:useCollisionAvoidance:啟用防止衝突功能,預設false

7:useExponentialBackOff:啟用指數倍數遞增的方式增加延遲時間,預設false

8:backOffMultiplier:重連時間間隔遞增倍數,只有值大於1和啟用useExponentialBackOff引數時才生

三 consumer重發策略

 

在接受的Client可以如下設定:

ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory( "failover:(tcp://192.168.1.106:61679,tcp://192.168.1.106:61819)?randomize=false");

RedeliveryPolicypolicy = new RedeliveryPolicy();

policy.setMaximumRedeliveries(3);

cf.setRedeliveryPolicy(policy);

當訊息試圖被傳遞的次數超過配置中maximumRedeliveries屬性的值時,那麼,broker會認 定該訊息是一個死訊息,並被把該訊息傳送到死佇列中。 預設,aciaveMQ中死佇列被宣告 為“ActivemMQ.DLQ”,所有不能消費的訊息都被傳遞到該死佇列中。 你可以在 acivemq.xml中配置individualDeadLetterStrategy屬性,示例如下:

<policyEntry queue= "> " >

 <deadLetterStrategy>

   <individualDeadLetterStrategy queuePrefix= "DLQ." useQueueForQueueMessages= "true" /> </deadLetterStrategy>

 </policyEntry>

 

四 自動刪除過期訊息

 有時需要直接刪除過期的訊息而不需要傳送到死佇列中,可以使用屬性 processExpired=false來設定,示例如下:

 <policyEntryqueue= "> " >

 <deadLetterStrategy>

<sharedDeadLetterStrategy processExpired= "false" />

 </deadLetterStrategy>

</policyEntry> 

五 存放非持久訊息到死佇列中

預設情況下,Activemq不會把非持久的死訊息傳送到死佇列中。非永續性如果你想 把非持久的訊息傳送到死佇列中,需要設定屬性processNonPersistent=“true”,示例如 下: <policyEntryqueue= "> " >

<deadLetterStrategy>

 <sharedDeadLetterStrategyprocessNonPersistent= "true" />

 </deadLetterStrategy>

</policyEntry>

 

六 消費端快取

ActiveMQ通過Prefetch機制來提高效能,方式是在客戶端的記憶體裡可能會快取一定數量的訊息。快取訊息的數量由prefetch limit來控制。當某個consumer的prefetch buffer已經達到上限,那麼broker不會再向consumer分發訊息,直到consumer向broker傳送訊息的確認,確認後的訊息將會從快取中去掉。 可以通過在ActiveMQConnectionFactory或者ActiveMQConnection上設定 ActiveMQPrefetchPolicy物件來配置prefetchpolicy。

也可以通過connection options或者destination options來配置。例如: tcp://localhost:61616?jms.prefetchPolicy.all=50 tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

prefetchsize的預設值如下:

1:persistent queues (default value: 1000)

2:non-persistent queues (default value: 1000)

3:persistent topics (default value: 100)

4:non-persistent topics (default value: Short.MAX_VALUE-1

 

樣例程式碼 [email protected]:wornxiao/framwork_bymyself.git