RabbitMQ簡單模式開發與測試(一)
阿新 • • 發佈:2021-10-18
分析:
生產者傳送訊息到RabbitMQ的佇列(simple_queue);消費者可以從佇列中獲取訊息。可以使用RabbitMQ的簡單模式(simple)。
簡單模式圖示:
P:生產者,也就是要傳送訊息的程式
C:消費者:訊息的接受者,會一直等待訊息到來。
queue:訊息佇列,類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。
操作步驟
1.搭建RabbitMQ入門工程-安裝依賴
使用IDEA建立heima-rabbitmq的maven工程,用於測試RabbitMQ的訊息收發;使用了jdk1.8,在工程中的pom.xml檔案中新增用於操作RabbitMQ的依賴。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency>
程式碼目錄結構:
2.簡單模式-生產者
編寫訊息生產者程式碼,傳送訊息到佇列
生產者實現傳送訊息的步驟:
- 建立連線工廠(設定RabbitMQ的連線引數);
- 建立連線;
- 建立頻道;
- 宣告佇列;
- 傳送訊息;
- 關閉資源
package com.itheima.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 簡單模式:傳送訊息 */ public class Producer { static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { //1. 建立連線工廠(設定RabbitMQ的連線引數); ConnectionFactory connectionFactory = new ConnectionFactory(); //主機;預設localhost connectionFactory.setHost("localhost"); //連線埠;預設5672 connectionFactory.setPort(5672); //虛擬主機;預設/ connectionFactory.setVirtualHost("/itcast"); //使用者名稱;預設guest connectionFactory.setUsername("heima"); //密碼;預設guest connectionFactory.setPassword("heima"); //2. 建立連線; Connection connection = connectionFactory.newConnection(); //3. 建立頻道; Channel channel = connection.createChannel(); //4. 宣告佇列; /** * 引數1:佇列名稱 * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上) * 引數3:是否獨佔本連線 * 引數4:是否在不使用的時候佇列自動刪除 * 引數5:其它引數 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //5. 傳送訊息; String message = "你好!簡單的rabbitmq。"; /** * 引數1:交換機名稱;如果沒有則指定空字串(表示使用預設的交換機) * 引數2:路由key,簡單模式中可以使用佇列名稱 * 引數3:訊息其它屬性 * 引數4:訊息內容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //6. 關閉資源(頻道關閉 連線關閉) channel.close(); connection.close(); } }
ps:在設定連線工廠的時候;如果沒有指定連線的引數則會有預設值;可以去設定虛擬主機。
3. 入門工程-消費者
編寫訊息消費者程式碼,從佇列中接收訊息並消費
分析:
從RabbitMQ中佇列(與生產者傳送訊息時的佇列一致;simple_queue)接收訊息;
實現消費者步驟:
- 建立連線工廠;
- 建立連線;(抽取一個獲取連線的工具類)
package com.study.rabbitmq.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { // 1. 建立連線工廠(設定RabbitMQ的連線引數); ConnectionFactory connectionFactory= new ConnectionFactory(); //主機;預設localhost connectionFactory.setHost("localhost"); //連線埠;預設5672 connectionFactory.setPort(5672); //虛擬主機;預設/ connectionFactory.setVirtualHost("/"); //使用者名稱;預設guest connectionFactory.setUsername("guest"); //密碼;預設guest connectionFactory.setPassword("guest"); // 2. 建立連線 return connectionFactory.newConnection(); } }
- 建立頻道;
- 宣告佇列;
- 建立消費者(接收訊息並處理訊息);
- 監聽佇列 (需要持續監聽佇列訊息,所以不要關閉資源)
package com.itheima.rabbitmq.simple;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 簡單模式;消費者接收訊息
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1. 建立連線工廠;
//2. 建立連線;(抽取一個獲取連線的工具類)
Connection connection = ConnectionUtil.getConnection();
//3. 建立頻道;
Channel channel = connection.createChannel();
//4. 宣告佇列;
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
* 引數3:是否獨佔本連線
* 引數4:是否在不使用的時候佇列自動刪除
* 引數5:其它引數
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//5. 建立消費者(接收訊息並處理訊息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//接收到的訊息
System.out.println("接收到的訊息為:" + new String(body, "utf-8"));
}
};
//6. 監聽佇列
/**
* 引數1:佇列名
* 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除;
* 如果設定為false則需要手動確認
* 引數3:消費者
*/
channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
//不關閉資源,應該一直監聽訊息
//channel.close();
//connection.close();
}
}
4. 入門工程測試
目標:啟動消費者和生產者,到RabbitMQ中查詢佇列並在消費者端IDEA控制檯檢視接收到的訊息
分析:
先啟動生產者:傳送訊息到RabbitMQ佇列(simple_queue)
idea終端結果:
rabbitmq 後臺:http://localhost:15672
消費者:接收RabbitMQ佇列訊息
運行了2次生產著之後啟動消費者:
小結:
簡單模式:生產者傳送訊息到佇列中,一個消費者從佇列中接收訊息。
在RabbitMQ中消費者只能從佇列接收訊息。
如果接收訊息的消費者在同一個佇列中有兩個或多個時;訊息是如何分配的?