rabbitmq保證訊息投遞成功的對比分析
技術標籤:javarabbitmq生產者投遞訊息可靠性javarabbitmq
- pom.xml
引入rabbitmq所需jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelVersion>4.0.0</modelVersion>
<groupId>fun.gosuncn</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
< artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
</project>
- RabbitmqUtils
設定連線資訊,獲取連線工廠。
import com.rabbitmq.client.ConnectionFactory;
public class RabbitmqUtils {
public static ConnectionFactory getConnectionFactory () {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.0.8.6");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
return factory;
}
}
- Receiver
作為消費者不斷的監聽佇列,消費訊息。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Receiver {
public static void main(String[] args) {
ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("direct-queue", false, false, false, null);
channel.basicConsume("direct-queue", true, (consumerTag, message) -> {
System.out.println("DeliverCallback's consumerTag is {" + consumerTag + "}");
System.out.println("消費訊息的內容 = " + new String(message.getBody()));
}, consumerTag -> System.out.println("CancelCallback's consumerTag is {" + consumerTag + "}"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 第一種方式 - 事務方式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
public static void main(String[] args) {
ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("direct-queue", false, false, false, null);
channel.txSelect();
channel.basicPublish("", "direct-queue", null, "hello world".getBytes());
//int i = 1 / 0;
channel.txCommit();
} catch (Exception e) {
if (channel != null) {
try {
channel.txRollback();
// TODO 事務回滾 邏輯 [再次傳送、存庫操作等]
System.out.println("事務回滾 邏輯 [再次傳送、存庫操作等]");
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
開啟事務channel.txSelect();
。
執行channel.basicPublish("", "direct-queue", null, "hello world".getBytes());
訊息的投遞。
使用int i = 1 / 0;
模擬上一步訊息的投遞過程發生了IO異常,此時將不會執行channel.txCommit();
操作。
而是進入channel.txRollback();
程式碼塊,進行事務回滾邏輯操作。
假如,沒有發生任何異常,將會執行channel.txCommit();
操作。
以上過程是如何確認訊息的投遞成功的呢?
首先,客戶端開啟了事務,客戶端將訊息投遞給伺服器,假如投遞過程中因網路等原因發生了IOException
,那麼將會進入channel.txRollback();
邏輯;假如投遞到伺服器了,後續的邏輯發生異常,進入channel.txRollback();
依然是屬於投遞失敗,消費者是消費不到訊息的。
因此,事務的方式可以處理訊息傳送失敗後的解決方案,以確保對訊息的完全掌控。
- 第二種方式 - 同步confirm方式
package fun.gosuncn.test1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
public static void main(String[] args) {
ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("direct-queue", false, false, false, null);
channel.confirmSelect();
channel.basicPublish("", "direct-queue", null, "hello world".getBytes());
//int i = 1 / 0;
if (channel.waitForConfirms()) {
System.out.println("訊息已經確認投遞成功");
Thread.sleep(3000L);
}
System.out.println("驗證waitForConfirms是阻塞方法");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
通過channel.confirmSelect();
開啟confirm模式。
而channel.waitForConfirms()
的作用是,阻塞程式,等待確認結果的返回,進行邏輯操作。
假如channel.basicPublish
與channel.waitForConfirms()
之間出現了任何異常錯誤,訊息是有可能投遞成功的。
- 第三種方式 - 同步confirm方式(批量)
package fun.gosuncn.test1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
public static void main(String[] args) {
ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("direct-queue", false, false, false, null);
channel.confirmSelect();
channel.basicPublish("", "direct-queue", null, "hello world1".getBytes());
channel.basicPublish("", "direct-queue", null, "hello world2".getBytes());
channel.basicPublish("", "direct-queue", null, "hello world3".getBytes());
if (channel.waitForConfirms()) {
System.out.println("訊息已經確認投遞成功");
Thread.sleep(3000L);
}
System.out.println("驗證waitForConfirms是阻塞方法");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
其實,批量確認和序列確認的區別就是,批量確認是執行多個basicPublish
後,執行一次waitForConfirms
即可。
- 第四種方式 - 非同步confirm方式
package fun.gosuncn.test1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Sender {
public static void main(String[] args) {
ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("direct-queue", false, false, false, null);
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("nack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
}
});
channel.basicPublish("", "direct-queue", null, "hello world1".getBytes());
channel.basicPublish("", "direct-queue", null, "hello world2".getBytes());
channel.basicPublish("", "direct-queue", null, "hello world3".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
// finally {
// if (channel != null) {
// try {
// channel.close();
// } catch (IOException | TimeoutException e) {
// e.printStackTrace();
// }
// }
// if (connection != null) {
// try {
// connection.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
// }
}
}
使用非同步確認的方式需要注意的是:一定不能關閉連線,否則可能會出現異常。
事務方式:42891毫秒
序列確認:40551毫秒
批量確認:688毫秒
非同步確認:318毫秒
綜上結果,可以看出:事務方式效率最慢,非同步確認方式效率最佳
。