1. 程式人生 > >activeMQ中的訊息重試與死信佇列

activeMQ中的訊息重試與死信佇列

activeMQ會在什麼情況下重新發送訊息?

activeMQ中的訊息重發,指的是訊息可以被broker重新分派給消費者,不一定的之前的消費者。重發訊息之後,消費者可以重新消費。訊息重發的情況有以下幾種。

1.事務會話中,當還未進行session.commit()時,進行session.rollback(),那麼所有還沒commit的訊息都會進行重發
2.使用客戶端手動確認的方式時,還未進行確認並且執行Session.recover(),那麼所有還沒acknowledge的訊息都會進行重發
3.所有未ack的訊息,當進行session.closed()關閉事務,那麼所有還沒ack的訊息broker端都會進行重發,而且是馬上重發


4.訊息被消費者拉取之後,超時沒有響應ack,訊息會被broker**重發**。

重發指的是訊息經過broker重新進行轉發給消費者,經過測試,1和2的情況訊息重發會發送給原來的消費者,3和4可以轉發訊息給別的消費者。累計次數超過設定的maximumRedeliveries時訊息都會都會進入死信佇列。

有毒訊息
當一個訊息被接收的次數超過maximumRedeliveries(預設為6次)次數時,會給broker傳送一個poison _ack,這種ack型別告訴broker這個訊息“有毒”,嘗試多次依然失敗,這時broker會將這個訊息傳送到DLQ,以便後續處理。activeMQ預設的死信佇列是ActiveMQ.DLQ,如果沒有特別指定,死信訊息都會被髮送到這個佇列。

預設情況下持久訊息過期都會被送到DLQ,非持久訊息過期預設不會送到DLQ。

可以通過配置檔案為指定佇列建立死信佇列。

重試策略配置

下面是模擬重試的消費者程式碼

public class JMSClientAckConsumer {
    public static void main(String[] args) throws JMSException {
        //根據broker URL建立連線工廠

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.15:61616"
); //建立連線 ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); //建立會話 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); //重發策略 RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); queuePolicy.setInitialRedeliveryDelay(0); queuePolicy.setRedeliveryDelay(1000); queuePolicy.setUseExponentialBackOff(false); queuePolicy.setMaximumRedeliveries(2); //建立佇列(有則不建立) Destination destination = session.createQueue("garine-queue"); RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap(); map.put((ActiveMQDestination) destination, queuePolicy); session.createConsumer(destination).setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); session.rollback(); } catch (JMSException e) { e.printStackTrace(); } } }); } }

重試策略配置除了以上的方式,還可以按照url的配置方式,可配置屬性很多,這裡列舉兩個
1.url

tcp://192.168.0.15:61616?jms.redeliveryPolicy.initialRedeliveryDelay=0&jms.redeliveryPolicy.redeliveryDelay=1000

重發策略配置可以在官方文件找到重發策略配置
2.在activeMq broker中配置

上面模擬的是在事務會話中呼叫rollback進行重試,經過模擬,發現訊息每次重試都是直接在原來的消費者進行重試,即使在重試次數內該消費者掛了,訊息依然不會馬上分發給別的消費者重試。

死信佇列配置
如果想自定義死信傳送策略,可以在activeMQ.xml裡面進行配置。

<destinationPolicy> 

           <policyMap> 

             <policyEntries> 

               <policyEntry topic=">" > 

                 <pendingMessageLimitStrategy> 

                   <constantPendingMessageLimitStrategy limit="1000"/> 

                 </pendingMessageLimitStrategy> 

               </policyEntry>  

                // “>”表示對所有佇列生效,如果需要設定指定佇列,則直接寫隊 列名稱 

            <policyEntry queue=">"> 
                  <deadLetterStrategy> 
                 //queuePrefix:設定死信佇列字首 
                    <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="false"/> 
                     //是否丟棄過期訊息
                     <!--<sharedDeadLetterStrategy processExpired="false" />-->
                   </deadLetterStrategy> 

               </policyEntry> 

             </policyEntries> 

           </policyMap> 

       </destinationPolicy> 

什麼時候重新消費死信佇列裡面的訊息?
死信佇列也是佇列,訊息當然可以被重新消費。

相關推薦

activeMQ訊息死信佇列

activeMQ會在什麼情況下重新發送訊息? activeMQ中的訊息重發,指的是訊息可以被broker重新分派給消費者,不一定的之前的消費者。重發訊息之後,消費者可以重新消費。訊息重發的情況有以下幾種。 1.事務會話中,當還未進行session.commi

ActiveMQ訊息死信管理(DLQ)

DLQ-死信佇列(Dead Letter Queue)用來儲存處理失敗或者過期的訊息。 出現以下情況時,訊息會被redelivered A transacted session is used and rollback() is called. A transacted session is closed b

RabbitMQ.net core(四) 訊息的優先順序 死信佇列

1.訊息的優先順序 假如現在有個需求,我們需要讓一些優先順序最高的通知推送到客戶端,我們可以使用redis的sortedset,也可以使用我們今天要說的rabbit的訊息優先順序屬性 Producer程式碼 using RabbitMQ.Client; using System; using

ActiveMQ訊息機制

處理失敗時的訊息重發機制 1. 處理失敗 指的是MessageListener的onMessage方法裡丟擲RuntimeException。 2. Message頭裡有兩個相關欄位:Redelivered預設為false,redeliveryCounter預設為0。 3.

ActiveMQ訊息持久化儲存

訊息中介軟體解決方案續   上一篇中我們講到了在Spring工程中基本的使用訊息中介軟體,這裡就不在繼續贅述。   針對訊息中介軟體,這篇講解兩個我們常遇到的問題。   問題1:如果我們的訊息的接收過程中發生異常,怎麼解決?   問題2:釋出訂閱模式(Topic)下如果消費端宕機引起的訊息的丟失,怎麼

springboot 整合 activemq:傳送自定義物件以及失敗訊息

                <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-act

springboot+activemq引入發機制

簡介 一個 value nec ring cto 為我 body over 一、簡介 在使用activemq消息中間件進行消息隊列傳輸時,總會由於各種原因導致消息失敗。 一個經典的場景是一個生成者向Queue中發消息,裏面包含了一組郵件地址和郵件內容。而消費者從Queue中

spring-retry熔斷詳解— 億級流量 內容補充

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

RocketMQ-訊息訊息冪等去訊息模式

訊息重試 Rocketmq提供了訊息重試機制,這是一些其他訊息佇列沒有的功能。我們可以依靠這個優秀的機制,而不用在開發中增加更多的業務程式碼去實現 Consumer 消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。Consumer 消費訊息失敗通常可以認為有以下幾種情況  

RocketMQ批量消費、訊息、消費模式、刷盤方式

一、Consumer 批量消費可以通過consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條這裡需要分為2種情況1、Consumer端先啟動  2、Consumer端後啟動.   正常情況下:應該是Consumer需要先啟動1

spring-retry熔斷詳解

轉至:http://www.broadview.com.cn/article/233 本文是《億級流量》第6章 超時與重試機制補充內容。 spring-retry專案實現了重試和熔斷功能,目前已用於SpringBatch、Spring Integrat

ActiveMQ訊息的持久化和非持久化 以及 持久訂閱者 和 非持久訂閱者之間的區別聯絡

①DeliveryMode 這是傳輸模式。ActiveMQ支援兩種傳輸模式:持久傳輸和非持久傳輸(persistent and non-persistent delivery),預設情況下使用的是持久傳輸。 可以通過MessageProducer類的 setDeliv

PHP使用ActiveMQ實現訊息佇列

前面我們已經學了如何部署ActiveMQ, 我們知道通過ActiveMQ的一個管理後臺可以檢視任務佇列。 今天 用PHP來操作ActiveMQ,我們可以藉助一個第三方擴充套件。 下載: composer require fusesource/s

解決VS2013調ASP.NET無法調的問題:當前不會命中斷點。在 XXXX.dll 找到了 XXX.cs 的副本,但是當前源代碼 XXXX.dll 內置的版本不同。

strong 當前不會命中斷點 導致 隨機 當前日期 目錄 一次 但是 解決 解決思路: 一定是在某個文件夾存在了副本,結果果然不出所料。 當前日期是2016年3月10日,But C:\Windows\Microsoft.NET\Framework\v4.0.3

Java重寫的區別

ref 類對象 就是 不同 3.4 做出 同方 相同 默認 簡單的個人記憶,重寫是子類中的方法與父類的方法參數、返回值一模一樣,重載是對於同一個類中的方法,方法名相同,參數類型、個數不同。當父類的引用指向子類對象的時候,子類中如果有對於父類方法的重寫,則調用的是子類重寫過的

python2MySQLdb加入超時及其功能

python mysqldb 重試定義一個狀態,超時時間和重試的次數限制(此為樣例代碼,你可以把他們包裝到函數或類中)循環判斷_conn_status狀態且判斷最大重試次數在循環中try except 連接如果DB連接成功則_conn_status為False,異常except則_conn_retries_c

Java的Overload(載)Override(重寫、覆蓋)

tro 子類 數列 AD 屬性。 需要 ide per cati java中的方法重載發生在同一個類中兩個或者多個方法的方法名相同但是參數不同的情況,方法重載是指子類與父類之間子類重新定義了父類的方法,重寫的方法與原方法簽名、返回值、參數完全相同。Overload(重載)

SpringCloud | FeignClient和Ribbon機制區別聯系

feign per spec 笛卡爾 making log tag tom str 在spring cloud體系項目中,引入的重試機制保證了高可用的同時,也會帶來一些其它的問題,如冪等操作或一些沒必要的重試。 今天就來分別分析一下 FeignClient 和

springcloud超時時間次數配置

adt second fault .exe 次數 pri ring ati tor #hystrix配置hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=120000ri

ActiveMQ訊息、收訊息、持久化,查詢佇列剩餘訊息數、出隊數的實現

1.首先啟動activeMQ的服務 public class RunServer {           /** 啟動activeMQ服務&