RabbitMQ學習(3)- RabbitMQ結構
目錄
RabbitMQ幾大元件
- 生產者:訊息建立者,將訊息傳送到訊息中介軟體的。
- 訊息:包括有效載荷與標籤。有效載荷:要傳輸的資料;標籤:描述有效載荷的屬性;RabbitMQ通過標籤決定誰獲得該訊息,消費者只能得到有效載荷。
- 消費者:是接收訊息的。
- Brocker:是訊息中介軟體服務的節點。一個Brocker=一個RabbitMQ,一個伺服器上如果有多個RabbitMQ就有多個Brocker。
- 佇列:是用來儲存訊息的。
- 交換器:交換器僅僅對訊息進行轉發。交換器有不同的型別(如:fanout、direct、topic、headers)
- 路由鍵:交換器與佇列的關係是通過繫結鍵(BindingKey,稱為路由鍵)建立的。
- 繫結
交換器型別
RabbitMQ常用的交換器型別有fanout、direct、topic、headers四種:
- fanout:它會把所有傳送到該交換器的訊息,路由到所有與該交換器繫結的佇列中;
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("rabbitstudy"); factory.setPassword("123456"); factory.setHost("192.168.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null); channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "com.cdsn.test1"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "com.cdsn.test2"); channel.basicPublish(EXCHANGE_NAME, "com.cdsn.test", MessageProperties.PERSISTENT_TEXT_PLAIN, "Hello World!".getBytes()); channel.close(); connection.close(); }
- direct:把訊息路由到那些BindingKey和RoutingKey完全匹配的佇列中;
- topic:類似於direct,但可以使用萬用字元匹配規則;("#"可匹配多個或零個單詞,“*”可匹配單個單詞)
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("rabbitstudy"); factory.setPassword("123456"); factory.setHost("192.168.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null); channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "*.test"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "#.test"); channel.basicPublish(EXCHANGE_NAME, "com.cdsn.test", MessageProperties.PERSISTENT_TEXT_PLAIN, "Hello World!".getBytes()); channel.close(); connection.close(); }
- header:訊息不根據路由鍵的匹配規則路由,而是根據傳送的訊息內容中的headers屬性進行匹配。
RabbitMQ執行流程
- 生產者傳送訊息
-
生產者與Broker建立連線(Connection),開啟通道(Channel)
-
生產者宣告交換器(交換器型別、是否持久化、是否自動刪除等)
-
生產者宣告佇列(是否持久化、是否排他、是否自動刪除)
-
生產者通過路由鍵將交換器和佇列繫結
-
生產者傳送訊息至Broker(攜帶路由鍵等)
-
交換器根據接收到的路由鍵,以及交換器型別查詢匹配的佇列
-
找到,將訊息存入相對應的佇列中;找不到,則根據生產者的配置,選擇丟棄還是退回給生產者
-
關閉通道、關閉連線
- 消費者接收訊息
- 消費者與Broker建立連線(Connection),開啟通道(Channel)
- 消費者向Broker請求消費相應佇列的訊息,可能設定回撥函式
- 等待Broker迴應並投遞相應佇列中的訊息,接收訊息
- 消費者確認(ack)接收到的訊息
- RabbitMQ從佇列中刪除相應已經被確認的訊息
- 關閉通道、關閉連線
Connection與Channel
RabbitMQ所有的AMQP指令都是通過通道完成的。
RabbitMQ的Channel與Netty中的NIO模型區別在於Channel是建立在TCP連線之上的。
生產者執行流程
- 當生產者與Broker建立連線時,呼叫factory.newConnection()方法。
- 當客戶端呼叫channel.createChannel()方法時,準備開啟通道。
- 傳送訊息時,呼叫channel.basicPublish()方法。
消費者執行流程
- 當客戶端呼叫channel.basicConsume()方法時,向Broker告知準備好消費訊息。
- 客戶端呼叫channel.basicAck()方法,向Broker傳送確認通知。
細說交換器
交換器
交換器
引數 | 作用 |
exchange | 交換器名稱 |
type | 交換器的型別,常見如fanout、direct、topic、headers |
durable | 設定是否持久化 |
autoDelete | 設定是否自動刪除 |
internal | 設定是否是內建的 |
argument | 其他結構化引數 |
備註:
持久化:交換器的建立持久化到磁碟;
自動刪除:自動刪除的前提是至少有一個佇列或者交換器與這個交換器繫結,之後所有與這個交換器繫結的佇列或者交換器都與此解綁;
channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, false, null);// 建立一個自動刪除的交換器
內建交換器:客戶端程式無法直接傳送訊息到交換器,只能通過交換器路由到內建交換器。
(1)不等待伺服器通知建立佇列成功命令:
void channel.exchangeDeclareNoWait(String exchange,String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
(2)檢測交換器是否存在,如果不存在則丟擲異常:
Exchange.DeclareOK exchangeDeclarePassive(String name) throws IOException;// 如果要用到這方法,需要解決這個異常
(3)刪除交換器:
① Exchange.DeleteOK exchangeDelete(String exchange) throws IOException;
② void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
③ Exchange.DeleteOK exchangeDelete(String exchange, boolean ifUnused) throws IOException;
備註:
exchange標識交換器的名稱
ifUnused用來設定是否在交換器沒有被使用的情況下刪除