1. 程式人生 > 其它 >rabbitmq保證訊息投遞成功的對比分析

rabbitmq保證訊息投遞成功的對比分析

技術標籤:javarabbitmq生產者投遞訊息可靠性javarabbitmq

  • pom.xml

引入rabbitmq所需jar包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion> <groupId>fun.gosuncn</groupId> <artifactId>rabbitmq</artifactId> <version>1.0</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <
artifactId
>
amqp-client</artifactId> <version>5.10.0</version> </dependency> </dependencies> </project>
  • RabbitmqUtils

設定連線資訊,獲取連線工廠。

import com.rabbitmq.client.ConnectionFactory;

public class RabbitmqUtils {
    public static ConnectionFactory getConnectionFactory
() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.0.8.6"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory; } }
  • Receiver

作為消費者不斷的監聽佇列,消費訊息。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Receiver {
    public static void main(String[] args) {
        ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("direct-queue", false, false, false, null);
            channel.basicConsume("direct-queue", true, (consumerTag, message) -> {
                System.out.println("DeliverCallback's consumerTag is {" + consumerTag + "}");
                System.out.println("消費訊息的內容 = " + new String(message.getBody()));
            }, consumerTag -> System.out.println("CancelCallback's consumerTag is {" + consumerTag + "}"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 第一種方式 - 事務方式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Sender {
    public static void main(String[] args) {
        ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("direct-queue", false, false, false, null);
            channel.txSelect();
            channel.basicPublish("", "direct-queue", null, "hello world".getBytes());
            //int i = 1 / 0;
            channel.txCommit();
        } catch (Exception e) {
            if (channel != null) {
                try {
                    channel.txRollback();
                    // TODO 事務回滾 邏輯 [再次傳送、存庫操作等]
                    System.out.println("事務回滾 邏輯 [再次傳送、存庫操作等]");
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
            }
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

開啟事務channel.txSelect();

執行channel.basicPublish("", "direct-queue", null, "hello world".getBytes());訊息的投遞。

使用int i = 1 / 0;模擬上一步訊息的投遞過程發生了IO異常,此時將不會執行channel.txCommit();操作。

而是進入channel.txRollback();程式碼塊,進行事務回滾邏輯操作。

假如,沒有發生任何異常,將會執行channel.txCommit();操作。

以上過程是如何確認訊息的投遞成功的呢?

首先,客戶端開啟了事務,客戶端將訊息投遞給伺服器,假如投遞過程中因網路等原因發生了IOException,那麼將會進入channel.txRollback();邏輯;假如投遞到伺服器了,後續的邏輯發生異常,進入channel.txRollback();依然是屬於投遞失敗,消費者是消費不到訊息的。

因此,事務的方式可以處理訊息傳送失敗後的解決方案,以確保對訊息的完全掌控。

  • 第二種方式 - 同步confirm方式
package fun.gosuncn.test1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Sender {
    public static void main(String[] args) {
        ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("direct-queue", false, false, false, null);
            channel.confirmSelect();
            channel.basicPublish("", "direct-queue", null, "hello world".getBytes());
            //int i = 1 / 0;
            if (channel.waitForConfirms()) {
                System.out.println("訊息已經確認投遞成功");
                Thread.sleep(3000L);
            }
            System.out.println("驗證waitForConfirms是阻塞方法");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

通過channel.confirmSelect();開啟confirm模式。

channel.waitForConfirms()的作用是,阻塞程式,等待確認結果的返回,進行邏輯操作。

假如channel.basicPublishchannel.waitForConfirms()之間出現了任何異常錯誤,訊息是有可能投遞成功的。

  • 第三種方式 - 同步confirm方式(批量)
package fun.gosuncn.test1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Sender {
    public static void main(String[] args) {
        ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("direct-queue", false, false, false, null);
            channel.confirmSelect();
            channel.basicPublish("", "direct-queue", null, "hello world1".getBytes());
            channel.basicPublish("", "direct-queue", null, "hello world2".getBytes());
            channel.basicPublish("", "direct-queue", null, "hello world3".getBytes());
            if (channel.waitForConfirms()) {
                System.out.println("訊息已經確認投遞成功");
                Thread.sleep(3000L);
            }
            System.out.println("驗證waitForConfirms是阻塞方法");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

其實,批量確認和序列確認的區別就是,批量確認是執行多個basicPublish後,執行一次waitForConfirms即可。

  • 第四種方式 - 非同步confirm方式
package fun.gosuncn.test1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class Sender {
    public static void main(String[] args) {
        ConnectionFactory factory = RabbitmqUtils.getConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("direct-queue", false, false, false, null);
            channel.confirmSelect();
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("ack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
                }

                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("nack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
                }
            });
            channel.basicPublish("", "direct-queue", null, "hello world1".getBytes());
            channel.basicPublish("", "direct-queue", null, "hello world2".getBytes());
            channel.basicPublish("", "direct-queue", null, "hello world3".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
//        finally {
//            if (channel != null) {
//                try {
//                    channel.close();
//                } catch (IOException | TimeoutException e) {
//                    e.printStackTrace();
//                }
//            }
//            if (connection != null) {
//                try {
//                    connection.close();
//                } catch (IOException e) {
//                    e.printStackTrace();
//                }
//            }
//        }
    }
}

使用非同步確認的方式需要注意的是:一定不能關閉連線,否則可能會出現異常

事務方式:42891毫秒

序列確認:40551毫秒

批量確認:688毫秒

非同步確認:318毫秒

綜上結果,可以看出:事務方式效率最慢,非同步確認方式效率最佳