1. 程式人生 > 程式設計 >RabbitMQ(三) -- 訊息與佇列進階

RabbitMQ(三) -- 訊息與佇列進階

一:摘要概述

二:基礎生產應用

2.1 基礎生產API

前面兩篇文章已經或多或少接觸到生產者與服務端互動,核心在於交換器,生產者不會與資料儲存的佇列直接耦合。即訊息生產傳送時必要結構應該有目標交換器、routingKey、訊息體

在這裡插入圖片描述

引數 含義
exchange 交換器名稱
routingKey 交換器路由訊息的規則
BasicProperties 訊息的一些屬性特點,後續再深入跟進理解
body 訊息內容byte陣列
		// 前提已經建立好佇列與交換器並進行繫結
        String exchangeName = "exchangeName";
        String routingKey = "routingKey";
        String message = "message";
        // 基礎投遞訊息
        channel.basicPublish(exchangeName,routingKey,null
,message.getBytes()); 複製程式碼
2.2 mandatory詳解
引數 含義
mandatory 至少匹配一個,若交換器中訊息未匹配到任意一個佇列路由,可以通過ReturnListener監控獲取
immediate 活躍匹配,不僅僅要求匹配到佇列,且要求該佇列有消費者連線。若無消費者連線則不會路由至該佇列,若最後沒有路由到任意一個佇列,可以通過ReturnListener獲取。3.0已經廢棄,設定為true使用將發生異常資訊
        String exchangeName = "exchangeName";
        String routingKey = "routingKey"
; String message = "message"; // 測試mandatory boolean mandatory = true,immediate = false; String badRoutingKey = "badRoutingKey"; channel.basicPublish(exchangeName,badRoutingKey,mandatory,immediate,basicProperties,message.getBytes()); // 增加ReturnListener ReturnListener returnListener = new ReturnListener() { @Override public void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body){ System.out.println(new String(body)); } }; channel.addReturnListener(returnListener); 複製程式碼
2.3 備胎交換器

通過新增ReturnListener確實可以保證交換器中未被路由的訊息不丟失,但是引發的血案就是客戶端邏輯複雜化,反正個人不是很喜歡在正常邏輯中處理意外情況,這樣會導致後期迭代、維護成本上升。這時候就可以通過備胎交換器解決

交換器建立時繫結一個備胎交換器,當訊息沒有對應路由佇列時就會轉發到這個備胎交換器,這與後面講得死信交換器佇列有一定相似之處。通過前面交換器建立程式碼一直設定為null的第五個引數Map實現,該引數通過key-value形式設定一些屬性特徵,後面佇列也會使用到這個Map

key為 alternate-exchange,value為備胎交換器名稱

        // 建立備胎交換器與佇列
        String alternateExchangeName = "alternateExchange",alternateQueueName = "alternateQueue";
        String alternateBinding = "alternateBinding";
        channel.exchangeDeclare(alternateExchangeName,BuiltinExchangeType.DIRECT,true,false,null);
        channel.queueDeclare(alternateQueueName,null);
        channel.queueBind(alternateQueueName,alternateExchangeName,alternateBinding);

        // 將備胎交換器設定到引數中
        Map<String,Object> exchangeArgumentMap = new HashMap<>();
        exchangeArgumentMap.put("alternate-exchange",alternateExchangeName);
        channel.exchangeDeclare("exchangeName",exchangeArgumentMap);

複製程式碼
  • 通過控制面板檢視交換器會有AE標識標識該交換器繫結有備胎交換器
  • 備胎交換器路由到佇列使用的routingKey為訊息攜帶的routingKey
  • 若原訊息設定有訊息過期等屬性轉發到備胎交換器路由後依然具備該屬性
    在這裡插入圖片描述

三:Durable持久化

前面接觸到建立交換器Exchange、佇列Queue時都會有引數Durable持久化的存在,但是思考一下,佇列持久化以後訊息就會持久化儲存?答案是否定的,可以測試一下這個結論重啟RabbitMQ服務應用即可

若想要實現訊息的持久化則還需要在傳送訊息時通過上一節講到的BasicProperties屬性物件完成。該物件中存在諸多屬性表示訊息的特性,不做集中講解,拆分到具體特性單元中講解。回到訊息持久化操作上,BasicProperties類中存在屬性deliveryMod

  • 1表示訊息不持久化
  • 2表示訊息持久化

如下所示程式碼Demo,BasicProperties是一個介面,例項化需要藉助與AMQP介面的靜態內部類即AMQP.BasicProperties實現。當然還有一點就是該內部類設計了建造者模式,大大方便客戶端API操作

	AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
											  .Builder()
											  .deliveryMode(2)
											  .build();
	channel.basicPublish(exchangeName,message.getBytes());
複製程式碼

如果單純僅僅為了訊息持久化特性,可以使用MessageProperties列舉類,該類中封裝了幾種常用BasicProperties物件例項,主要是針對持久化、優先順序兩個屬性。最後提一點就是傳送一條訊息後可以重啟伺服器進行驗證測試,測試過程結果本文不再贅述

	// deliveryMod 為 2表示持久化
	// priority 為 0 表示最低優先順序
	// contentType 為 text/palin表示文字訊息
    AMQP.BasicProperties persistentTextPlain = MessageProperties.PERSISTENT_TEXT_PLAIN;
複製程式碼

四:Priority優先順序

生活中尊卑有序、長幼有別,技術上自然存在優先順序概念,比如滴滴打車那種加錢插隊操作就可以使用訊息優先順序實現。上一節中提到了優先順序屬性priority,看過我前面Java佇列的文章其中就有優先順序佇列ProrityBlockingQueue,當然這只是聯想而已,RabbitMQ中實現訊息優先順序還是依賴於物件BasicProperties

	AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
											  .Builder()
											  .priority(9)
											  .build();
	channel.basicPublish(exchangeName,message.getBytes());
複製程式碼

數字越大優先順序別越高,當然這個範圍一般在0-9之間即可。測試時傳送不同優先順序訊息到佇列中消費檢視結果即可,下圖所示發現只是按照訊息插入的順序消費,優先順序並未生效,你在搞什麼飛機?稍安勿躁,若實現訊息優先順序則必須設定佇列為優先順序佇列

在這裡插入圖片描述
設定優先順序佇列操作在佇列例項化時通過引數map實現,前面一直展示程式碼時queueDeclare()第五個引數都設定為null,其實該引數為Map集合,表示可以通過key-value的形式設定佇列的其它屬性。其中優先順序key為x-max-priority,value表示優先順序最大數值。通過控制檯可以看到佇列屬性多了Pri標誌,表示該佇列為優先順序佇列

        Map<String,Object> argumentMap = new HashMap<>();
        argumentMap.put("x-max-priority",10);
        channel.queueDeclare("queueName",argumentMap);
複製程式碼

在這裡插入圖片描述
再使用程式碼進行測試將會得到如下示例,訊息生產是安裝優先順序倒序,即0-9進行投遞。最後消費結果為9-0,表示優先順序生效,且證明優先順序策略為數值越大優先順序越高
在這裡插入圖片描述

五:TTL 自動刪除過期

反正個人在學習這裡時腦海中總是聯想到Redis的TTL,實踐場景個人理解可以用到具備一定時效性訊息上。如上層業務呼叫簡訊交付模組功能,需要非同步通過RabbitMQ通訊交換訊息,上層業務常見會Redis快取生成校驗碼,且設定一定過期時效,若超過時限則使用者接收到簡訊也是無用的。這時就可以考慮將Redis中的TTL與RabbitMQ中的TTL時效一致,保證使用者接收到的驗證簡訊都具備可用性

自然,相對於Durable、Priority等特性需要佇列與訊息合作完成而言,TTL自動刪除過期在這點上具有獨立性。可以做如下三個方面設定:

  • 設定某個單獨訊息的過期時間
  • 設定某個佇列的過期時間
  • 設定某個佇列中訊息過期時間(這樣相當於統一設定佇列中所有訊息過期TTL)
5.1 單訊息TTL

某條訊息的TTL時間藉助於BasicProperties 例項物件的expiration屬性完成,單位為ms。其餘的就不多贅述,測試時傳送一條20s訊息驗證即可

	AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
											  .Builder()
											  .expiration("20000")
											  .build();
	channel.basicPublish(exchangeName,message.getBytes());
複製程式碼
5.2 佇列TTL

某個佇列長時間不進行操作後需要被刪除則可以使用設定佇列TTL時間實現,這個實現方式依舊採用Map方式,key為x-expires,value為過期時限,單位ms

        Map<String,Object> argumentMap = new HashMap<>();
        argumentMap.put("x-expires",2000);
        channel.queueDeclare("queueName",argumentMap);
複製程式碼

最後通過面板可以看到佇列屬性中多了Exp標識,該標識標識佇列為自動過期刪除佇列

在這裡插入圖片描述

5.3 佇列訊息TTL

某個佇列中所有訊息過期時長一致,有必要在每個訊息生產是設定單訊息TTL?自然這時候就可以採用佇列訊息TTL策略實現,其實現不言而喻自然是依賴於佇列例項化時設定

        Map<String,Object> argumentMap = new HashMap<>();
        argumentMap.put("x-message-ttl",argumentMap);
複製程式碼

通過控制面板可以觀察到對列屬性多了TTL標識,該標識表示佇列中訊息到期自動刪除

在這裡插入圖片描述

六:MaxLength、MaxLengthBytes

訊息可以無限積壓?可以傳輸任意大小訊息?自然,RabbitMQ的設計考慮到這兩點,可以通過在佇列初始化的操作中設定。其實個人理解這兩個屬性作用意義不大,仁者見仁智者見智,知識完整性還是需要考慮的

  • key為x-max-length表示佇列最大積壓訊息數量限制
  • key為x-max-length-bytes表示單訊息最大位元組數量
        Map<String,Object> argumentMap = new HashMap<>();
        argumentMap.put("x-max-length",1);
        argumentMap.put("x-max-length-bytes",1024);
        channel.queueDeclare("queueName",argumentMap);
複製程式碼

Lim標識標識佇列限制最大訊息積壓數量,Lim B表示佇列限制單訊息最大位元組數。兩個策略可以同時設定,共同生效。測試發現當超過最大積壓數量時會刪除頭部訊息騰出空間存放新訊息,這是RabbitMQ預設採用的策略drop-head

在這裡插入圖片描述
如果想實現拒絕新訊息加入則可以使用引數x-overflow實現,key為x-overflow,value為reject-publish表示拒絕接收新訊息加入佇列。可以採用不同策略然後超過最大積壓數量設定值,檢視最後消費到的資料是否與策略實現理論結果一致

        argumentMap.put("x-overflow","reject-publish");
複製程式碼

七:Dead-Letter 死信轉發

前面講到單訊息自動過期TTL策略實現使用BasicProperties類屬性expiration即可,同時也說到了佇列積壓訊息最大數量限制在佇列例項化時依賴x-max-length屬性實現,採用預設策略drop-head會刪除頭部訊息。問題來了,這些操作都會導致訊息丟失,這時候若想換個地方儲存這些訊息怎麼辦?可採用佇列死信轉發實現,場景如下:

  • 使用BasicProperties屬性expiration設定的TTL到期自動刪除訊息
  • 使用x-max-length限制訊息最大積壓數量,且採用預設策略drop-head刪除的頭部訊息
  • 後面講消費者會講到的訊息確認機制中拒絕確認訊息,且未將訊息放回佇列中刪除的訊息
        // 設定死信交換器
        channel.exchangeDeclare("deadExchange",null);
        // 設定死信佇列
        channel.queueDeclare("deadQueue",null);
        // 繫結死信交換器與佇列
        channel.queueBind("deadQueue","deadExchange","deadBinding");
        // 設定正常佇列的死信交換器
        Map<String,Object> argumentMap = new HashMap<>();
        argumentMap.put("x-dead-letter-exchange","deadExchange");
        argumentMap.put("x-dead-letter-routing-key","deadBinding");
        channel.queueDeclare("queueName",argumentMap);
複製程式碼

首先在控制面板上可以看到正常佇列有DLXDLK兩個標識,表示該佇列設定了死信交換器和死信路由routingKey。其次最終結果是TTL到期自動刪除的訊息轉發到了deadQueue中。最後需要說明若不設定x-dead-letter-routing-key引數,死信交換器將採用訊息自身攜帶的routingKey進行路由

在這裡插入圖片描述

思考題:

	如何用死信佇列實現延遲佇列,並考慮具體場景如訂單倒計時關閉
複製程式碼