1. 程式人生 > >rabbit - producer的confirm和consumer的ack模式

rabbit - producer的confirm和consumer的ack模式

本篇和大家分享的是關於rabbit的生產和消費方的一些實用的操作;正如文章標題,主要內容如producer的confirm和consumer的ack,這兩者使用的模式都是用來保證資料完整性,防止資料丟失。

  • producer的confirm模式
  • consumer的ack模式

producer的confirm模式

首先,有這樣一種業務場景1:a系統在做活動前,需要給使用者的手機發送一條活動內容簡訊希望使用者來參加,因為使用者量有點大,所以通過往簡訊mq中插入資料方式,讓簡訊服務來消費mq發簡訊;

此時插入mq訊息的服務為了保證給所有使用者發訊息,並且要在短時間內插入完成(因此用到了非同步插入方式(快速)),我們就需要知道每次插入mq是否成功,如果不成功那我們可以收集失敗的資訊後補發(因此confirm模式排上了用場);如圖設計:

 在springboot中可以使用基於amqp封裝的工廠類來開啟confirm模式,然後通過RabbitTemplate模板來設定回撥函式,如下程式碼:

 1     ///region producer生產 - confirm模式
 2 
 3     public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback) {
 4         return this.getRabbitTemplate(this.connectionFactory(), confirmCallback);
 5     }
 6 
 7     public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback) {
 8         RabbitTemplate template = new RabbitTemplate(connectionFactory);
 9         //product開啟confirm模式
10         connectionFactory.setPublisherConfirms(true);
11         //設定confirm回撥處理
12         template.setConfirmCallback(confirmCallback);
13         return template;
14     }
15     ///endregion

這裡通過RabbitTemplate.ConfirmCallback函式程式設計來傳遞我們自定義的回撥方法,如下收集confirm返回的結果資訊:

1         RabbitUtil rabbitUtil = new RabbitUtil(this.getFirstNode().getLink());
2         RabbitTemplate template = rabbitUtil.getRabbitTemplate((a, b, c) -> {
3             System.out.println("firstNodeTpl - ConfirmCallback的Id:" + a.getId() + ";狀態:" + b + ";資訊:" + c);
4         });

最後再通過RabbitTemplate例項的convertAndSend方法傳送mq資訊,我們能夠在日誌中看到如下記錄的資訊:

這裡的狀態true:表示send成功,false:表示send失敗;通常false的時候資訊c會有響應的錯誤提示,這裡把網路斷開,如下錯誤提示:

consumer的ack模式

再來,有這樣一種場景2:簡訊服務去消費mq佇列資訊時,倘若服務呼叫的運營商傳送簡訊介面異常了(簡訊運營商介面欠費),我們此時的簡訊是傳送失敗的,使用者也收不到簡訊,但是在預設(預設開啟ack)前提下mq訊息已經被消費了rabbit中沒有記錄了(kafka例外);想要mq訊息在業務邏輯異常時還存在,那麼可以使用ack方式;

在springboot中可以使用基於amqp封裝的工廠類關閉自動ack模式,改為手動ack方式;只有當業務程式碼流程走完後,最後通過程式碼設定ack標識,來通知rabbit訊息可以丟棄了;如果設定了手動模式後,又沒有提交ack標識,那麼mq中的訊息一直存在無法釋放(每次consumer消費後,rabbit會把noack的訊息重複放入佇列中):

 1     ///region consumer監聽 - 手動ack
 2     public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
 3         return this.listenerContainerFactory(this.connectionFactory());
 4     }
 5 
 6     public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {
 7         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
 8         factory.setConnectionFactory(connectionFactory);
 9         //程式碼手動ack
10         factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
11         //開啟消費者數量
12         factory.setConcurrentConsumers(2);
13         //每次接受資料量,預設250
14         factory.setPrefetchCount(300);
15         return factory;
16     }
17     ///endregion

通過連線工廠設定手動ack方式,然後獲取mq訊息後,走完正常業務邏輯,最後再手動通知ack釋放訊息,如下:

1     @RabbitListener(containerFactory = "firstNodeListener", queues = {"${shenniu.rabbits.firstNode.queue}"})
2     private void firstNodeListener(String msg, Channel channel, Message message) {
3         try {
4             long deliverTag = message.getMessageProperties().getDeliveryTag();
5             System.out.println("firstNodeListener - 消費訊息 [" + deliverTag + "] - " + msg);
6             channel.basicAck(deliverTag, true);
7         } catch (Exception ex) {
8         }
9     }

這裡ack主要根據mq訊息的唯一編號(deliverTag)來通知;如果我們不設定ack確認,那麼訊息狀態會是這樣如下rabbit管理後臺: