1. 程式人生 > >RabbitMQ學習與深入思考

RabbitMQ學習與深入思考

最近一段專案實踐中大量使用了基於RabbitMQ的訊息中介軟體,也積累的一些經驗和思考,特此成文,望大家不吝賜教。
本文包括RabbitMQ基本概念、進階概念、實踐與思考等三部分,著重強調相關概念基於RabbitMQ進行擴充套件開發的思路,並簡要展示RabbitMQ客戶端的編碼,接下來通過一個思維導圖來展示整體思路,紅星表示重點部分。

 

1.基本概念

官方文件: http://www.rabbitmq.com/#getstarted

1.1.核心實體

進入詳細介紹之前,先來看一張簡化版的訊息流轉的模型圖。

  • a.生產者Producer傳送訊息,訊息分為訊息標識和訊息體(payLoad有效載荷)兩部分,訊息標識中包含Exchange交換器的名字和RoutingKey路由鍵資訊。
  • b.由於交換器之前已經通過2個不同的BindingKey繫結鍵分別綁定了兩個佇列,因此交換器可以對比路由鍵和繫結鍵,之後將訊息路由到匹配的佇列中。
  • c.佇列將訊息推送到指定的一個或多個消費者中,多個消費者會選擇最簡單的RoundRobin輪訓方式進行選擇。

Exchange交換器

核心概念,可以簡化理解為路由器,其不儲存資料,其通常會和一個佇列繫結,但也可以繫結到另一個交換器上。其包括4種類型的交換器型別,生產實踐中主要使用可以精細管理的direct和topic兩種。direct,路由規則為完全匹配;topic,支援完全匹配,也支援模糊匹配;fanout,會將訊息轉發到該交換器繫結的所有佇列中;header,實際中無應用。

Queue佇列

用於儲存訊息,和Kafka的訊息模型完全不同,其會將訊息儲存在Topic中。因此在實現類似ConsumerGroup概念時差異很大,Kafka是可以回溯訊息的,但Rabbit新繫結的佇列的資料是空的,不能回溯。

Binding繫結

其通過繫結鍵將交換器和佇列關聯起來

RouteKey & BindingKey 路由鍵和繫結鍵

通常會將路由鍵和繫結鍵都稱為路由鍵,其差異是路由鍵是包含在訊息標識中的,而繫結鍵是用於在交換器和佇列間建立繫結關係的,訊息會通過它們的匹配情況進行路由。

1.2.通訊

通訊實體

包括Connection連線和Channel通道,連線通常對應一個基於TCP的Socket,建立Connection的關鍵引數包括使用者名稱、密碼、虛擬主機、主機地址和埠。一個連線可以建立多個Channel例項,推薦控制數量(比如10個),但Channel例項不能線上程間共享,應用程式需要為每一個執行緒開闢一個Channel。

AMQP協議

Java技術棧匯中,關於訊息通訊聽到比較多的是JMS,而AMQP協議相對更加嚴格一些,其包括Module Layer,Session Layer, Transport Layer三個層次,業務開發主要接觸到的是Module Layer,客戶端可以通過Queue.Declare、Basic.Consume等命令進行操作。

1.3.虛擬主機與使用者

vhost

虛擬主機,可以在邏輯上看做一臺RabbitMQ伺服器,其擁有自己的交換器、佇列和繫結關係等。RabbitMQ對許可權的管理就是基於vhost進行的,預設會建立一個全域性的/虛擬主機,通常不推薦直接使用該vhost,而是需要自定義一個vhost便於管理。

User
對於某一個使用者,通常包括3種類型的許可權:read,允許讀取佇列資料;write,允許向佇列傳送資料;config,允許建立佇列,如果客戶端需要支援新增佇列,需要新增該許可權,否則會報無許可權錯誤【踩過坑】。

 

2.進階概念

2.1.交換器與佇列增強

TTL過期時間

目前在兩個不同的粒度設定訊息的TTL,分別是佇列粒度和訊息粒度。由於RabbitMQ實際機制的原因,通常都選擇的是佇列粒度,對於佇列粒度來說,佇列頭的訊息一定是最先失效的,因此可以高效的判斷和丟棄。而對於訊息粒度,其需要在訊息真正投遞到消費者時進行判斷,如果該訊息之前的訊息並沒有失效,那麼它將一直存活。

死信交換器DLX

全稱為 Dead-Letter-Exchange,也是RabbitMQ擴充套件開發的核心概念,當一條訊息在一個佇列中變成死信之後,它能自動的被轉發到一個交換器中,這個交換器就是DLX,很多地方稱和這個交換器繫結的佇列是死信佇列, 我並不是完全認同。

訊息變為死信的原因

訊息被拒絕Nack,Reject,並且requeue引數為false(重點強調一下,生產實踐中通常不能開啟requeue,因為開啟後佇列中的訊息就會出現亂序的情況,且效能很差);訊息過期;佇列達到最大長度。
DLX 也是一個正常的交換器,和一般的交換器沒有區別,它能在任何的佇列上被指定 ,實際上就是設定某個佇列的屬性。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key" , "dlx-routing-key");
channel.exchangeDeclare("dlx_exchange" , "direct"); //建立 DLX: 
channel.queueDeclare("normal_queue", false, false, false, args); //為佇列normal_queue新增 DLX

命名規範:佇列型別,[生產者.消費者.佇列名字尾];Topic型別,[生產者.exchange.佇列名字尾]

延遲佇列

延遲佇列儲存的物件是對應的延遲訊息,所謂"延遲訊息"是指當訊息被髮送以後,並不想讓消費者立刻拿到訊息,而是等待特定時間後,消費者才能拿到這個訊息進行消費。使用延遲訊息場景如在訂單系統中,希望使用者下單後30分鐘內支付,否則取消訂單。那麼業務系統可以在下單後,傳送延遲訊息,到達指定時間後消費該訊息來判斷是否支援。該方式在資料量比較大的場景中比通過Job掃描資料表合適。

在AMQP協議中,或者RabbitMQ本身沒有直接支援延遲佇列的功能,但是可以通過前面 所介紹的DLX和TTL模擬出延遲佇列的功能,這部分在實踐與思考部分進行介紹。

持久化

交換器和佇列元資料持久化和訊息的持久化,訊息的持久化可以直接使用MessageProperties.PERSISTENT_TEXT_PLAIN

2.2.生產者

生產者客戶端的程式碼比較簡潔,如下所示。

byte[] messageBodyBytes = "Hello , Xionger!".getBytes();
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
        messageBodyBytes);

從高可用HA的角度不經要問,訊息的生產者將訊息傳送出去之後,Broker是否收到訊息。RabbitMQ針對這個問題提供了兩種解決方案,分別是事務機制和傳送方確認PublisherConfirm。傳送者確認的實現繼續細分為3種形式,包括單條同步、批量同步和非同步方式。事務機制和單條同步確認方式的效能都比較差,通常只能達到2000QPS左右,因此通常推薦使用傳送方確認的批量方式和非同步方式,其QPS可以達到8000QPS以上。其中批量方式也存在一個隱患,即傳送一批訊息到服務端時,如果有一條訊息失敗,那麼該批次所有訊息都需要重試。因此目前生產實踐中 ,使用的是非同步方式,簡化的程式碼實踐如下所示。

SortedSet confirmSet = Sets.newTreeSet();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag - 1);
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //omit
        //訊息重新投遞處理
    }
});

tip: 這部分在服務端ack時有一個優化,只會回傳當前最大的標識,可以有效減少比對次數。

2.3.消費者

消費模式:拉模式,推模式,RabbitMQ推薦推模式,保持訊息消費的有序性。

boolean autoAck = false;
channel.basicQos(64);//prefetchCount
channel.basicConsume(queueName, autoAck, "myConsumerTag",
        new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                channel.basicAck(deliveryTag, false);
            }
        });

tip:

對於訊息生產者,過去還有一個訊息投遞不可達被返回的概念,涉及mandatory和immediate兩個引數,但其在生產實踐中並不常用。

 

3.實踐與思考

3.1.環境搭建

安裝:Mac環境 brew install rabbitmq,非常簡便

管理介面

  • unacked: 消費端沒有Ack的數量
  • Publish: 推送訊息的QPS
  • Deliver(manual ack): 手動Ack
  • durable: 持久化
  • Policy: 佇列的規則
  • Mirrors: 映象Broker

3.2.Client元件開發

在介紹了RabbitMQ主要知識後,擴充套件的分享一個簡易的基於RabbitMQ訊息中介軟體的思路。由於RabbitMQ是基於Erlang開發,雖然很棒但畢竟比較小眾,Java技術棧的工程師一般很難去修改RabbitMQ的原始碼,因此通常只是通過構建一個合理的客戶端SDK來支援業務開發。

生產者

生產者目標比較簡單,需要實現健壯性強的的傳送者確認機制【非同步】和支援佇列分片,佇列分片可以給佇列加上字尾標識,然後輪訓處理即可。

消費者

消費者部分希望支援消費失敗的重試機制、死信佇列及其報警機制,以支援3次重試消費為例,整體思路如下圖所示。【藉助之前介紹的TTL和DLX】

3.3.場景思考

RabbitMQ最大的特點是成熟度高,管理功能全面,近似開箱即用,二次開發實現一個簡單靠譜的客戶端就足以滿足大部分的場景,尤其對於初創企業、中小企業來說是一個非常棒的選擇。

  • 高可用HA:源生支援映象伺服器、同步模型等機制
  • 高吞吐:可以通過佇列分片的方式支援大量的QPS,比如單個佇列推薦QPS4000以下,健康的水位在2000左右,那麼就需要通過二次開發客戶端來實現佇列分片。

參考資料

[1]朱忠華.RabbitMQ實戰指南[M].電子工業出版社:北京,2017.11:.

下期預告:深入理解MySQL索引機制
善良比聰明更重要--張小龍

作  者:熊二哥 
出  處:http://www.cnblogs.com/xiong2ge/ 

鄭州男科醫院

鄭州看婦科哪家醫院好

鄭州哪家不孕不育醫院好

鄭州人流醫院