1. 程式人生 > 其它 >5.rabbitmq的work queues模式

5.rabbitmq的work queues模式

技術標籤:rabbitmq佇列javarabbitmq

1.模型圖

在這裡插入圖片描述
適用場景:消費者消費速度慢導致訊息在佇列中堆積

2.實現程式碼

工具類

public class MqUtil {
    // 建立連線工廠
    private static final ConnectionFactory  factory = new ConnectionFactory();

    static {
        // 設定連線mq的主機
        factory.setHost("192.168.175.159");
        // 設定連線埠號
        factory.
setPort(5672); // 設定連線的虛擬主機 factory.setVirtualHost("javatest"); // 設定訪問虛擬主機的使用者名稱 factory.setUsername("javatest"); // 設定訪問虛擬主機的密碼 factory.setPassword("javatest"); } // 獲取連線 public static Connection getConnection(){ try
{ return factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } // 關閉資源 public static void close(Channel channel,Connection connection)
{ try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

生產者

public class Send {
    public static void main(String[] args) throws IOException {
        // 獲取連線
        Connection connection = MqUtil.getConnection();
        // 建立通道
        Channel channel = connection.createChannel();
        // 將通道與佇列繫結
        channel.queueDeclare("work",true,false,false,null);
        // 釋出多條訊息
        for (int i=0;i<20;i++) {
            channel.basicPublish("","work",null,("work" + i).getBytes());
        }
        // 關閉資源
        MqUtil.close(channel,connection);
    }
}

消費者1

public class Recv1 {
    public static void main(String[] args) throws IOException {
        Connection connection = MqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}

消費者2

public class Recv2 {
    public static void main(String[] args) throws IOException {
        Connection connection = MqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}

結果展示

消費者2
在這裡插入圖片描述
消費者1
在這裡插入圖片描述

3.消費者的自動確認機制

詳情連線:link
從上面的最終結果可以看出,rabbitmq將訊息已迴圈的方式分配給兩個消費者進行消費。這是rabbitmq的一個預設機制。在生產實踐中存在兩個問題:

  • 設定消費為自動確認,當消費者出現異常時會導致資料的丟失
  • 沒有進行通道的預取設定,將佇列中所有的訊息全部分配給消費者,導致記憶體 的消耗增加

程式碼展示

public class Recv1 {
    public static void main(String[] args) throws IOException {
        Connection connection = MqUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        // 設定通道的預取數量為1,官方推薦100到300,資料會影響其吞吐量
        channel.basicQos(4);
        // 關閉訊息的自動確認機制
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                // 在處理完訊息後手動進行確認
                /*
                * 引數1: 訊息標籤
                * 引數2: 是否批量進行確認
                * */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}