rabbit - producer的confirm和consumer的ack模式
本篇和大家分享的是關於rabbit的生產和消費方的一些實用的操作;正如文章標題,主要內容如producer的confirm和consumer的ack,這兩者使用的模式都是用來保證資料完整性,防止資料丟失。
- producer的confirm模式
- consumer的ack模式
producer的confirm模式
首先,有這樣一種業務場景1:a系統在做活動前,需要給使用者的手機發送一條活動內容簡訊希望使用者來參加,因為使用者量有點大,所以通過往簡訊mq中插入資料方式,讓簡訊服務來消費mq發簡訊;
此時插入mq訊息的服務為了保證給所有使用者發訊息,並且要在短時間內插入完成(因此用到了非同步插入方式(快速)),我們就需要知道每次插入mq是否成功,如果不成功那我們可以收集失敗的資訊後補發(因此confirm模式排上了用場);如圖設計:
在springboot中可以使用基於amqp封裝的工廠類來開啟confirm模式,然後通過RabbitTemplate模板來設定回撥函式,如下程式碼:
1 ///region producer生產 - confirm模式 2 3 public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback) { 4 return this.getRabbitTemplate(this.connectionFactory(), confirmCallback); 5 } 6 7 public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback) { 8 RabbitTemplate template = new RabbitTemplate(connectionFactory); 9 //product開啟confirm模式 10 connectionFactory.setPublisherConfirms(true); 11 //設定confirm回撥處理 12 template.setConfirmCallback(confirmCallback); 13 return template; 14 } 15 ///endregion
這裡通過RabbitTemplate.ConfirmCallback函式程式設計來傳遞我們自定義的回撥方法,如下收集confirm返回的結果資訊:
1 RabbitUtil rabbitUtil = new RabbitUtil(this.getFirstNode().getLink()); 2 RabbitTemplate template = rabbitUtil.getRabbitTemplate((a, b, c) -> { 3 System.out.println("firstNodeTpl - ConfirmCallback的Id:" + a.getId() + ";狀態:" + b + ";資訊:" + c); 4 });
最後再通過RabbitTemplate例項的convertAndSend方法傳送mq資訊,我們能夠在日誌中看到如下記錄的資訊:
這裡的狀態true:表示send成功,false:表示send失敗;通常false的時候資訊c會有響應的錯誤提示,這裡把網路斷開,如下錯誤提示:
consumer的ack模式
再來,有這樣一種場景2:簡訊服務去消費mq佇列資訊時,倘若服務呼叫的運營商傳送簡訊介面異常了(簡訊運營商介面欠費),我們此時的簡訊是傳送失敗的,使用者也收不到簡訊,但是在預設(預設開啟ack)前提下mq訊息已經被消費了rabbit中沒有記錄了(kafka例外);想要mq訊息在業務邏輯異常時還存在,那麼可以使用ack方式;
在springboot中可以使用基於amqp封裝的工廠類關閉自動ack模式,改為手動ack方式;只有當業務程式碼流程走完後,最後通過程式碼設定ack標識,來通知rabbit訊息可以丟棄了;如果設定了手動模式後,又沒有提交ack標識,那麼mq中的訊息一直存在無法釋放(每次consumer消費後,rabbit會把noack的訊息重複放入佇列中):
1 ///region consumer監聽 - 手動ack 2 public SimpleRabbitListenerContainerFactory listenerContainerFactory() { 3 return this.listenerContainerFactory(this.connectionFactory()); 4 } 5 6 public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) { 7 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 8 factory.setConnectionFactory(connectionFactory); 9 //程式碼手動ack 10 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); 11 //開啟消費者數量 12 factory.setConcurrentConsumers(2); 13 //每次接受資料量,預設250 14 factory.setPrefetchCount(300); 15 return factory; 16 } 17 ///endregion
通過連線工廠設定手動ack方式,然後獲取mq訊息後,走完正常業務邏輯,最後再手動通知ack釋放訊息,如下:
1 @RabbitListener(containerFactory = "firstNodeListener", queues = {"${shenniu.rabbits.firstNode.queue}"}) 2 private void firstNodeListener(String msg, Channel channel, Message message) { 3 try { 4 long deliverTag = message.getMessageProperties().getDeliveryTag(); 5 System.out.println("firstNodeListener - 消費訊息 [" + deliverTag + "] - " + msg); 6 channel.basicAck(deliverTag, true); 7 } catch (Exception ex) { 8 } 9 }
這裡ack主要根據mq訊息的唯一編號(deliverTag)來通知;如果我們不設定ack確認,那麼訊息狀態會是這樣如下rabbit管理後臺: