1. 程式人生 > 實用技巧 >RabbitMQ安裝與訊息模型

RabbitMQ安裝與訊息模型

RabbitMQ安裝與訊息模型

通過docker安裝rabbitmq

#拉起映象-management帶控制檯
docker pull rabbitmq:management
#建立需要對映的目錄
mkdir -p rabbitmq/{etc,lib,var/{lib,log}}
#執行容器
docker run -d --hostname "主機名" --name mq1 -e RABBITMQ_DEFAULT_USER="賬號" -e RABBITMQ_DEFAULT_PASS="密碼" -p 15672:15672 -p 5672:5672 rabbitmq:management
#進入容器
docker exec -it mq1 bash
#檢視服務狀態
rabbitmqctl status
#檢視外掛列表
rabbitmq-plugins list
#測試訪問
curl localhost:15672

訊息模型

依賴

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>

WEB操作

新建一個虛擬主機

新建一個使用者,並點選使用者名稱

設定使用者的許可權,允許其訪問ems主機

封裝工具類

private static ConnectionFactory connectionFactory;

static {
    connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("122.51.70.176");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/ems");
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("123456");
}

public static Connection getConnection() {
    try {
        return connectionFactory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}

public static void close(Channel channel, Connection connection) {
    try {
        if (channel != null) {
            channel.close();
        }
        if (connection != null) {
            connection.close();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

直連模型

生產者傳送訊息到佇列, 消費者從中取出訊息.

生產者
@Test
public void helloWorld() throws IOException{
    Connection connection = MQUtils.getConnection();
    //建立通道
    Channel channel = connection.createChannel();
    //通道繫結訊息佇列, 佇列名hello, 佇列不持久化, 非獨佔, 不自動刪除, 附加引數
    channel.queueDeclare("hello", false, false, false, null);
    //釋出訊息, 不指定交換機, 佇列名, 傳遞訊息的額外設定, 訊息內容
    channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());
    //關閉通道
    MQUtils.close(channel, connection);
}
消費者
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);
    //消費訊息, 佇列名, 訊息自動確認, 消費的回撥介面
    channel.basicConsume("hello", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //列印獲得的訊息
            System.out.println(new String(body));
        }
    });
}

工作佇列(work queues)

讓多個消費者繫結到一個佇列, 共同消費佇列中的訊息, 該模型下預設消費者獲取訊息的方式是輪詢. 消費者消費的資訊數量是平均的.

生產者
@Test
public void workQueues() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work", true, false, false, null);
    for (int i = 0; i < 50; i++) {
        channel.basicPublish("", "work", null, ("hello work queues" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
消費者
//消費者1
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work", true, false, false, null);
    channel.basicConsume("work", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
            System.out.println("消費者1" + new String(body));
        }
    });
}
//消費者2
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("work", true, false, false, null);
    channel.basicConsume("work", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
            System.out.println("消費者2" + new String(body));
            try {
                //慢處理
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

兩個消費者獲得的訊息數量都是25條, 消費者1很快就完成了任務, 但是消費者2加班了很久. 需要優化改進,讓消費者能者多勞.

改進後的消費者
//消費者1
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    final Channel channel = connection.createChannel();
    //設通道上的訊息容量為1
    channel.basicQos(1);
    channel.queueDeclare("work", true, false, false, null);
    //把自動確認設定為false
    channel.basicConsume("work", false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消費者1" + new String(body));
            //手動確認訊息, 不開啟多條訊息確認
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    });
}
//消費者2
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    final Channel channel = connection.createChannel();
    channel.basicQos(1);
    channel.queueDeclare("work", true, false, false, null);
    channel.basicConsume("work", false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消費者2" + new String(body));
            channel.basicAck(envelope.getDeliveryTag(), false);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

廣播模型(釋出-訂閱模型)

可以有多個消費者, 每個消費者都有自己的消費佇列, 佇列繫結到交換機上, 交換機從生產者接收訊息. 最終可以實現一條訊息被多個消費者消費.

生產者
@Test
public void fanout() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //指定交換機, 交換機名為news, 交換機型別為fanout
    channel.exchangeDeclare("news","fanout");
    for (int i = 0; i < 10; i++) {
        //訊息釋出, 選擇交換機, 無路由key, 無其他設定, 訊息內容
        channel.basicPublish("news","",null, ("hello fanout" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
n個消費者
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //臨時佇列
    String queue = channel.queueDeclare().getQueue();
    //繫結佇列
    channel.queueBind(queue, "news", "");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消費者n" + new String(body));
        }
    });
}

直連模型(靜態路由模型)

在路由的直連(Direct)模式下, 佇列與交換機之間不再是任意綁定了, 而是要指定一個路由Key, 訊息的傳送也必須要指定上路由Key, 交換機根據對訊息路由Key進行判斷, 只有相匹配的情況下消費者才會接收到訊息.

生產者
@Test
public void direct() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //指定交換機, 交換機名為mail, 交換機型別為direct
    channel.exchangeDeclare("mail", "direct");
    for (int i = 0; i < 10; i++) {
        //訊息釋出, 選擇交換機, 無路由key, 無其他設定, 訊息內容
        if (i == 4 | i == 8) {
            channel.basicPublish("mail", "vip", null, ("vip訊息" + i).getBytes());
            continue;
        }
        channel.basicPublish("mail", "user", null, ("使用者訊息" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
消費者
//消費者1-普通使用者
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //臨時佇列
    String queue = channel.queueDeclare().getQueue();
    //繫結佇列
    channel.queueBind(queue, "mail", "user");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消費者1" + new String(body));
        }
    });
}
//消費者1-vip使用者
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //臨時佇列
    String queue = channel.queueDeclare().getQueue();
    //繫結多個佇列
    channel.queueBind(queue, "mail", "user");
    channel.queueBind(queue, "mail", "vip");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消費者2" + new String(body));
        }
    });
}

話題型別(動態路由模型)

在路由的話題(Topic)模式下, 也是使用路由Key來分配訊息的, 不過在繫結路由Key的時候可以使用萬用字元, 一般路由Key可以由一到多個單片語成, 單詞之間使用"."進行分割.

釋出者
@Test
public void Topic() throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //指定交換機, 交換機名為notice, 交換機型別為topic
    channel.exchangeDeclare("notice", "topic");
    for (int i = 0; i < 10; i++) {
        //訊息釋出, 選擇交換機, 無路由key, 無其他設定, 訊息內容
        if (i == 4) {
            channel.basicPublish("notice", "user.vip.msg", null, ("vip訊息" + i).getBytes());
            continue;
        }else if (i == 8) {
            channel.basicPublish("notice", "user.vip.present", null, ("vip禮物" + i).getBytes());
            continue;
        }
        channel.basicPublish("notice", "user.msg", null, ("使用者訊息" + i).getBytes());
    }
    MQUtils.close(channel, connection);
}
消費者
//消費者1
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //臨時佇列
    String queue = channel.queueDeclare().getQueue();
    //繫結佇列, *任意一個字詞, #任意數量字詞
    channel.queueBind(queue, "notice", "user.*");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消費者1" + new String(body));
        }
    });
}
//消費者2
public static void main(String[] args) throws IOException {
    Connection connection = MQUtils.getConnection();
    Channel channel = connection.createChannel();
    //臨時佇列
    String queue = channel.queueDeclare().getQueue();
    //繫結佇列, *任意一個字詞, #任意數量字詞
    channel.queueBind(queue, "notice", "user.#");
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            System.out.println("消費者2" + new String(body));
        }
    });
}