1. 程式人生 > 其它 >RabbitMQ的幾種模型和spring整合

RabbitMQ的幾種模型和spring整合

技術標籤:rabbitmqspring

如何從零搭建RabbitMQ,詳見此處
依賴:

<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.4.RELEASE</version>
	</parent>
	<properties>
		<java.version>1.8</java.version
>
</properties> <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.boot</
groupId
>
<artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>

連線工具類:

public class
ConnectionUtil { /** * 建立與RabbitMQ的連線 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定義連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定服務地址 factory.setHost("192.168.233.128"); //埠 factory.setPort(5672); //設定賬號資訊,使用者名稱、密碼、vhost factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("111111"); // 通過工程獲取連線 Connection connection = factory.newConnection(); return connection; } }

1.work模型

最簡單的模型我這就不列舉了,work解決的是佇列和消費者之間的堆擠,和下面的幾種模型不衝突
在這裡插入圖片描述

demo程式碼:
生產者

// 生產者
public class Send {
    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 迴圈釋出任務
        for (int i = 0; i < 50; i++) {
            // 訊息內容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 2);
        }
        // 關閉通道和連線
        channel.close();
        connection.close();
    }
}

消費者1

// 消費者1
public class Recv {
    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        final Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 設定每個消費者同時只能處理一條訊息,能者多勞
        channel.basicQos(1);
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
                try {
                    // 模擬完成任務的耗時:1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                // 手動ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 監聽佇列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

消費者2

//消費者2
public class Recv2 {
    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        final Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 設定每個消費者同時只能處理一條訊息(能者多勞模式)
        channel.basicQos(1);
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
                // 手動ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 監聽佇列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

2.fanout模型

在這裡插入圖片描述
一個消費者將訊息首先發送到交換器,交換器繫結到多個佇列,然後被監聽該佇列的消費者所接收並消費。
生產者

public class Send {

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        
        // 宣告exchange,指定型別為fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        // 訊息內容
        String message = "Hello everyone";
        // 釋出訊息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [生產者] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消費者1

//消費者1
public class Recv {
    private final static String QUEUE_NAME = "fanout_exchange_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費者2

// 消費者2
public class Recv2 {
    private final static String QUEUE_NAME = "fanout_exchange_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

3.direct模型

在這裡插入圖片描述
生成者生產訊息的時候帶了一個routingKey,佇列繫結到交換機的時候也要設定一個routingKey,如果這2個routingKey一致就可以把訊息推入佇列
生產者

public class Send {
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告exchange,指定型別為direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
        // 訊息內容
        String message = "商品新增了, id = 1001";
        // 傳送訊息,並且指定routing key 為:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
        System.out.println(" [商品服務:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消費者1

public class Recv {
    private final static String QUEUE_NAME = "direct_exchange_queue_1";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete訊息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費者2

public class Recv2 {
    private final static String QUEUE_NAME = "direct_exchange_queue_2";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

生產者的routingKey是insert,消費者1沒有這個routingKey,無法接收,消費者2可以

4. topic模型

在這裡插入圖片描述
topic模型其實就是direct模型的升級版,direct模型的routingKey是要完全一致的,topic模型可以用類似正則的方式匹配,item.*可以代表item.delete,item.insert,但不能代表item.group.insert;item.#則可以代表item.insert或者item.group.insert
生產者

public class Send {
    private final static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告exchange,指定型別為topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
        // 訊息內容
        String message = "新增商品 : id = 1001";
        // 傳送訊息,並且指定routing key 為:insert ,代表新增商品 MessageProperties.PERSISTENT_TEXT_PLAIN持久化
        channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        System.out.println(" [商品服務:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消費者1

public class Recv {
    private final static String QUEUE_NAME = "topic_exchange_queue_1";
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。需要 update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費者2

public class Recv2 {
    private final static String QUEUE_NAME = "topic_exchange_queue_22";
    private final static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列(durable:訊息持久化)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 繫結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

5.spring整合RabbitMQ

配置檔案

spring:
  rabbitmq:
    host: 192.168.233.128
    username: admin
    password: 111111
    virtual-host: /

生產者:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSend() throws InterruptedException {
        String msg = "100,商品更新成功";
        this.amqpTemplate.convertAndSend("tntab_ex","item.update", msg);
        // 等待10秒後再結束
        Thread.sleep(10000);
    }
}

消費者

@Component
public class Listener {
    //  建立交換機佇列然後繫結
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "tntab_queue", durable = "true"),
            exchange = @Exchange(
                    value = "tntab_ex",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"#.#"}))
//    @RabbitListener(queues = "topic_exchange_queue_2")
    public void listen(String msg){
        System.out.println("接收到訊息:" + msg);
    }
}