深入學習RabbitMQ(三):channel的confirm模式
上一篇部落格我們介紹了使用RabbitMQ可能會遇到的一個問題,即生產者不知道訊息是否真正到達broker代理伺服器,隨後通過AMQP協議層面為我們提供的事務機制解決了這個問題,但是採用事務機制實現會降低RabbitMQ的訊息吞吐量,那麼有沒有更加高效的解決方式呢?RabbitMQ團隊為我們拿出了更好的方案,即採用傳送方確認模式;
生產者確認模式實現原理:
生產者將通道設定成confirm模式,一旦通道進入confirm模式,所有在該通道上面釋出的訊息都將會被指派一個唯一的ID(從1開始),一旦訊息被投遞到所有匹配的佇列之後,broker就會發送一個確認給生產者(包含訊息的唯一ID),這就使得生產者知道訊息已經正確到達目的隊列了,如果訊息和佇列是可持久化的,那麼確認訊息會在將訊息寫入磁碟之後發出,broker回傳給生產者的確認訊息中delivery-tag域包含了確認訊息的序列號,此外broker也可以設定basic.ack的multiple域,表示到這個序列號之前的所有訊息都已經得到了處理;
confirm模式最大的好處在於他是非同步的,一旦釋出一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者應用便可以通過回撥方法來處理該確認訊息,如果RabbitMQ因為自身內部錯誤導致訊息丟失,就會發送一條nack訊息,生產者應用程式同樣可以在回撥方法中處理該nack訊息;
開啟confirm模式的方法:
生產者通過呼叫channel的confirmSelect方法將channel設定為confirm模式,(注意一點,已經在transaction事務模式的channel是不能再設定成confirm模式的,即這兩種模式是不能共存的)
生產者實現confiem模式有三種程式設計方式:
(1):普通confirm模式,每傳送一條訊息,呼叫waitForConfirms()方法等待服務端confirm,這實際上是一種序列的confirm,每publish一條訊息之後就等待服務端confirm,如果服務端返回false或者超時時間內未返回,客戶端進行訊息重傳;
(2):批量confirm模式,每傳送一批訊息之後,呼叫waitForConfirms()方法,等待服務端confirm,這種批量確認的模式極大的提高了confirm效率,但是如果一旦出現confirm返回false或者超時的情況,客戶端需要將這一批次的訊息全部重發,這會帶來明顯的重複訊息,如果這種情況頻繁發生的話,效率也會不升反降;
講完了基本的原理之後,程式碼級別我們該怎麼設定channel通道為confirm模式呢?以及我們該怎麼獲取broker返回給我們的確認訊息呢?
測試1:普通confirm模式
首先從最簡單的開始,僅僅將channel設定成confirm模式,並且生產者每傳送一條訊息就等待broker迴應確認訊息,至於確認訊息是什麼我們不去做任何處理,為了測試方便,此處生產者只發送了5條訊息,實現程式碼如下:
public class ProducerTest {
public static void main(String[] args) {
String exchangeName = "confirmExchange";
String queueName = "confirmQueue";
String routingKey = "confirmRoutingKey";
String bindingKey = "confirmRoutingKey";
int count = 5;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.151.74");
factory.setUsername("test");
factory.setPassword("test");
factory.setPort(5672);
//建立生產者
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
producer.run();
}
}
class Sender
{
private ConnectionFactory factory;
private int count;
private String exchangeName;
private String queueName;
private String routingKey;
private String bindingKey;
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
this.factory = factory;
this.count = count;
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
this.bindingKey = bindingKey;
}
public void run() {
Channel channel = null;
try {
Connection connection = factory.newConnection();
channel = connection.createChannel();
//建立exchange
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
//建立佇列
channel.queueDeclare(queueName, true, false, false, null);
//繫結exchange和queue
channel.queueBind(queueName, exchangeName, bindingKey);
channel.confirmSelect();
//傳送持久化訊息
for(int i = 0;i < count;i++)
{
//第一個引數是exchangeName(預設情況下代理伺服器端是存在一個""名字的exchange的,
//因此如果不建立exchange的話我們可以直接將該引數設定成"",如果建立了exchange的話
//我們需要將該引數設定成建立的exchange的名字),第二個引數是路由鍵
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條訊息").getBytes());
if(channel.waitForConfirms())
{
System.out.println("傳送成功");
}
}
final long start = System.currentTimeMillis();
System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
} catch (Exception e) {
e.printStackTrace();
}
}
}
在第50行呼叫Channel通道的confirmSelect方法將當前通道設定成了confirm模式,第57行通過for迴圈呼叫Channel的basicPublish方法傳送了5條訊息到訊息佇列中,第58行呼叫waitForConfirms方法等待broker服務端返回ack或者nack訊息,這種模式每傳送一條訊息就會等待broker代理伺服器返回訊息,這點我們可以從抓包的角度觀察結果:
可以看到上面生產者通過Confirm.Select將當前Channel通道設定成confirm模式,broker代理伺服器收到之後回傳Confirm.Select-Ok同一將當前Channel設定成confirm模式,此外看到返回5條Basic.Ack訊息;
測試2:批量confirm模式
這種模式生產者不是每傳送一條就等待broker確認,而是傳送一批,實現程式碼見下:
public class ProducerTest {
public static void main(String[] args) {
String exchangeName = "confirmExchange";
String queueName = "confirmQueue";
String routingKey = "confirmRoutingKey";
String bindingKey = "confirmRoutingKey";
int count = 100;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.151.74");
factory.setUsername("test");
factory.setPassword("test");
factory.setPort(5672);
//建立生產者
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
producer.run();
}
}
class Sender
{
private ConnectionFactory factory;
private int count;
private String exchangeName;
private String queueName;
private String routingKey;
private String bindingKey;
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
this.factory = factory;
this.count = count;
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
this.bindingKey = bindingKey;
}
public void run() {
Channel channel = null;
try {
Connection connection = factory.newConnection();
channel = connection.createChannel();
//建立exchange
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
//建立佇列
channel.queueDeclare(queueName, true, false, false, null);
//繫結exchange和queue
channel.queueBind(queueName, exchangeName, bindingKey);
channel.confirmSelect();
//傳送持久化訊息
for(int i = 0;i < count;i++)
{
//第一個引數是exchangeName(預設情況下代理伺服器端是存在一個""名字的exchange的,
//因此如果不建立exchange的話我們可以直接將該引數設定成"",如果建立了exchange的話
//我們需要將該引數設定成建立的exchange的名字),第二個引數是路由鍵
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條訊息").getBytes());
}
long start = System.currentTimeMillis();
channel.waitForConfirmsOrDie();
System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
} catch (Exception e) {
e.printStackTrace();
}
}
}
第50行呼叫channel.confirmSelect將當前channel通道設定成confirm模式,接著在第57行通過for迴圈傳送了100條訊息,第60行呼叫了channel的waitForConfirmsOrDie,從waitForConfirmsOrDie方法的註釋上可以看出,該方法會等到最後一條訊息得到確認或者得到nack才會結束,也就是說在waitForConfirmsOrDie處會造成當前程式的阻塞,以測試1程式傳送100條訊息為例,阻塞時間是135ms,我們再來看看對測試1的抓包情況:
從紅色箭頭的標號1出可以看到:首先是24向74傳送了Confirm.Select訊息表示請求將當前通道設定為confirm模式,接著74向24回送了Confirm.Select-Ok訊息表示同意將通道設定成confirm模式,從紅色標號2處NoWait欄位的值為false也印證了我們如果直接呼叫Channel通道的confirmSelect()方法的話,實際上預設是開啟broker回傳Confirm.Select-Ok確認訊息的;
接下來我們看看broker回傳給客戶端的確認訊息資料包是什麼樣子的呢?同樣通過抓包看看結果:
你會發現,在上面測試1中我們通過for迴圈傳送了100條訊息,但是在抓包的時候我們僅僅看到有兩個Basic.Ack確認訊息回傳回來,原因在於上面截圖的標號3處,你會發現Multiple域的值是True的,之前我們已經講過broker可以設定Multiple域表示broker已經收到當前確認訊息的Delivery-Tag域之前標號的訊息,以上面截圖為例的話表示broker告訴傳送者編號4之前的訊息已經全部收到了,從這點我們看出broker端預設情況下是進行批量回復的,並不是針對每條訊息都發送一條ack訊息;
測試2:
測試1我們僅僅是測試傳送者能夠收到broker的確認訊息以及知道了broker對訊息預設是採用批量回複方式的,那麼在程式中我們該怎麼獲取到broker回傳回來的確認訊息呢,假如我們有時候需要在收到確認訊息之後做一些提示性操作該怎麼辦呢?測試1中,我們採用的是Channel通道的waitForConfirmsOrDie等待broker端回傳回ack確認訊息的,但我們沒法拿到這個ack訊息進行後期操作,要想拿到ack訊息的話,我們可以給當前Channel通道繫結監聽器,具體來說就是呼叫Channel通道的addConfirmListener方法進行設定,Channel通道在收到broker的ack訊息之後會回撥設定在該通道監聽器上的handleAck方法,在收到nack訊息之後會回撥設定在該通道監聽器上的handleNack方法。
實現程式碼:
public class ProducerTest {
public static void main(String[] args) {
String exchangeName = "confirmExchange";
String queueName = "confirmQueue";
String routingKey = "confirmRoutingKey";
String bindingKey = "confirmRoutingKey";
int count = 100;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.151.74");
factory.setUsername("test");
factory.setPassword("test");
factory.setPort(5672);
//建立生產者
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
producer.run();
}
}
class Sender
{
private ConnectionFactory factory;
private int count;
private String exchangeName;
private String queueName;
private String routingKey;
private String bindingKey;
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
this.factory = factory;
this.count = count;
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
this.bindingKey = bindingKey;
}
public void run() {
Channel channel = null;
try {
Connection connection = factory.newConnection();
channel = connection.createChannel();
//建立exchange
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
//建立佇列
channel.queueDeclare(queueName, true, false, false, null);
//繫結exchange和queue
channel.queueBind(queueName, exchangeName, bindingKey);
channel.confirmSelect();
//傳送持久化訊息
for(int i = 0;i < count;i++)
{
//第一個引數是exchangeName(預設情況下代理伺服器端是存在一個""名字的exchange的,
//因此如果不建立exchange的話我們可以直接將該引數設定成"",如果建立了exchange的話
//我們需要將該引數設定成建立的exchange的名字),第二個引數是路由鍵
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條訊息").getBytes());
}
long start = System.currentTimeMillis();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
}
});
System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
} catch (Exception e) {
e.printStackTrace();
}
}
}
第60行我們呼叫了Channel通道的addConfirmListener設定了監聽器,並且在監聽器的handleAck和handleNack方法中列印了資訊,執行程式檢視輸出:
可以看到,雖然我們還是傳送了100條訊息,同樣我們並沒有收到100個ack訊息 ,只收到兩個ack訊息,並且這兩個ack訊息的multiple域都為true,這點和測試1是相同的,你多次執行程式會發現每次傳送回來的ack訊息中的deliveryTag域的值並不是一樣的,說明broker端批量回傳給傳送者的ack訊息並不是以固定的批量大小回傳的;
也就是我們通過通道Channel的waitForConfirmsOrDie方法或者為通道設定監聽器都可以保證傳送者收到broker回傳的ack或者nack訊息,那麼這兩種方式有什麼區別呢?從測試一的第61行程式碼以及測試2的第72行程式碼處你就能找到答案啦,測試1中呼叫waitForConfirmsOrDie方法傳送100條訊息並且全部收到確認需要135ms,測試2中通過監聽器的方式僅僅需要1ms,說明呼叫waitForConfirmsOrDie會造成程式的阻塞,通過監聽器並不會造成程式的阻塞,下一篇部落格我會試著從RabbitMQ的原始碼層面來分析這兩種方式造成這種區別的原因啦啦;
參考資料: