1. 程式人生 > 其它 >RabbitMQ簡單模式開發與測試(一)

RabbitMQ簡單模式開發與測試(一)

分析:

生產者傳送訊息到RabbitMQ的佇列(simple_queue);消費者可以從佇列中獲取訊息。可以使用RabbitMQ的簡單模式(simple)。

簡單模式圖示:

P:生產者,也就是要傳送訊息的程式

C:消費者:訊息的接受者,會一直等待訊息到來。

queue:訊息佇列,類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。

操作步驟

1.搭建RabbitMQ入門工程-安裝依賴

使用IDEA建立heima-rabbitmq的maven工程,用於測試RabbitMQ的訊息收發;使用了jdk1.8,在工程中的pom.xml檔案中新增用於操作RabbitMQ的依賴。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

程式碼目錄結構:

2.簡單模式-生產者

編寫訊息生產者程式碼,傳送訊息到佇列

生產者實現傳送訊息的步驟:

  1. 建立連線工廠(設定RabbitMQ的連線引數);
  2. 建立連線;
  3. 建立頻道;
  4. 宣告佇列;
  5. 傳送訊息;
  6. 關閉資源
package com.itheima.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 簡單模式:傳送訊息
 */
public class Producer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        //1. 建立連線工廠(設定RabbitMQ的連線引數);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機;預設localhost
        connectionFactory.setHost("localhost");
        //連線埠;預設5672
        connectionFactory.setPort(5672);
        //虛擬主機;預設/
        connectionFactory.setVirtualHost("/itcast");
        //使用者名稱;預設guest
        connectionFactory.setUsername("heima");
        //密碼;預設guest
        connectionFactory.setPassword("heima");

        //2. 建立連線;
        Connection connection = connectionFactory.newConnection();
        //3. 建立頻道;
        Channel channel = connection.createChannel();
        //4. 宣告佇列;
        /**
         * 引數1:佇列名稱
         * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
         * 引數3:是否獨佔本連線
         * 引數4:是否在不使用的時候佇列自動刪除
         * 引數5:其它引數
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //5. 傳送訊息;
        String message = "你好!簡單的rabbitmq。";

        /**
         * 引數1:交換機名稱;如果沒有則指定空字串(表示使用預設的交換機)
         * 引數2:路由key,簡單模式中可以使用佇列名稱
         * 引數3:訊息其它屬性
         * 引數4:訊息內容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
         
        //6. 關閉資源(頻道關閉 連線關閉)
        channel.close();
        connection.close();
    }
}

ps:在設定連線工廠的時候;如果沒有指定連線的引數則會有預設值;可以去設定虛擬主機。

3. 入門工程-消費者

編寫訊息消費者程式碼,從佇列中接收訊息並消費

分析

從RabbitMQ中佇列(與生產者傳送訊息時的佇列一致;simple_queue)接收訊息;

實現消費者步驟:

  1. 建立連線工廠;
  2. 建立連線;(抽取一個獲取連線的工具類)
package com.study.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        // 1. 建立連線工廠(設定RabbitMQ的連線引數);
        ConnectionFactory connectionFactory= new ConnectionFactory();
        //主機;預設localhost
        connectionFactory.setHost("localhost");
        //連線埠;預設5672
        connectionFactory.setPort(5672);
        //虛擬主機;預設/
        connectionFactory.setVirtualHost("/");
        //使用者名稱;預設guest
        connectionFactory.setUsername("guest");
        //密碼;預設guest
        connectionFactory.setPassword("guest");
//   2. 建立連線
        return connectionFactory.newConnection();
    }
}
  1. 建立頻道;
  2. 宣告佇列;
  3. 建立消費者(接收訊息並處理訊息);
  4. 監聽佇列 (需要持續監聽佇列訊息,所以不要關閉資源)
package com.itheima.rabbitmq.simple;

import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 簡單模式;消費者接收訊息
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1. 建立連線工廠;
        //2. 建立連線;(抽取一個獲取連線的工具類)
        Connection connection = ConnectionUtil.getConnection();
        //3. 建立頻道;
        Channel channel = connection.createChannel();
        //4. 宣告佇列;
        /**
         * 引數1:佇列名稱
         * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
         * 引數3:是否獨佔本連線
         * 引數4:是否在不使用的時候佇列自動刪除
         * 引數5:其它引數
         */
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        //5. 建立消費者(接收訊息並處理訊息);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機
                System.out.println("交換機為:" + envelope.getExchange());
                //訊息id
                System.out.println("訊息id為:" + envelope.getDeliveryTag());
                //接收到的訊息
                System.out.println("接收到的訊息為:" + new String(body, "utf-8"));
            }
        };
        //6. 監聽佇列
        /**
         * 引數1:佇列名
         * 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除;
         * 如果設定為false則需要手動確認
         * 引數3:消費者
         */
        channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
      //不關閉資源,應該一直監聽訊息
      //channel.close(); 
      //connection.close();
    }
}

4. 入門工程測試

目標:啟動消費者和生產者,到RabbitMQ中查詢佇列並在消費者端IDEA控制檯檢視接收到的訊息

分析

先啟動生產者:傳送訊息到RabbitMQ佇列(simple_queue)

idea終端結果:

rabbitmq 後臺:http://localhost:15672

消費者:接收RabbitMQ佇列訊息

運行了2次生產著之後啟動消費者:

小結:

簡單模式:生產者傳送訊息到佇列中,一個消費者從佇列中接收訊息。

在RabbitMQ中消費者只能從佇列接收訊息。
如果接收訊息的消費者在同一個佇列中有兩個或多個時;訊息是如何分配的?