1. 程式人生 > 實用技巧 >RabbitMQ-基礎

RabbitMQ-基礎

最近在學習訊息中介軟體RabbitMq,把學習的一些筆記記錄下來,,因此有寫的不足之處,歡迎大家指導!

1.RabbitMQ是什麼?

RabbitMQ是一個基於AMQP協議的高階訊息中介軟體,它主要的技術特點是可用性,安全性,叢集,多協議支援,視覺化的客戶端,活躍的社群。

2.我們為什麼選擇RabbitMQ

RabbitMQ優勢

1.同步變非同步

可以使用執行緒池解決,但是缺點很明顯:要自己實現執行緒池,並且強耦合

大多數是使用訊息佇列來解決

2.低內聚高耦合:解耦----減少強依賴.

3.流量削峰

通過訊息佇列設定請求最大值,超過閥值的拋棄或者轉到錯誤介面.

4.rabbitmq採用通道通訊。不採用tcp直接通訊

1.tcp的建立和銷燬開銷大,建立3次握手,銷燬4次分手

2.高峰時成千上萬條的連結會造成資源的巨大浪費,而且作業系統沒秒處理tcp的數量也是有數量限制的,必定造成效能瓶頸

3.一條執行緒一條通道,多條執行緒多條通道,公用一個tcp連線。一條tcp連線可以容納無限條通道(硬碟容量足夠的話),不會造成效能瓶頸。

總結4個詞:解耦、提速、廣播、削峰!

劣勢

1.系統的可用性降低,一旦Rabbitmq 掛了,訊息無法繼續傳輸 2.系統的穩定性降低,如:系統傳送了一條訊息到中介軟體,突然由於網路故障等問題,資料丟失了;插入重複資料,髒資料,大量的積壓訊息所以我們要考慮,如何保證訊息高可靠傳遞(0丟失)、訊息傳遞(不重複);百萬訊息積壓的線上故障處理;
3.分散式一致性問題 3.RabbitMQ模型

  Broker(訊息代理) : 實際上就是訊息伺服器實體。

  Exchange(交換機) : 用來發送訊息的AMQP實體,它指定訊息按什麼路由規則,路由到哪個佇列。

  Queue(訊息佇列) :每個訊息都會被投入到一個或多個佇列。

  Binding(繫結) : 它的作用就是把交換機(Exchange)和佇列(Queue)按照路由規則繫結起來。

  Routing Key(路由關鍵字) :路交換機(Exchange)根據這個關鍵字進行訊息投遞。

  vhost(虛擬主機) : 虛擬主機,一個訊息代理(Broker)裡可以開設多個虛擬主機(vhost),用作不同使用者的許可權分離。

  Connection(連線) :AMQP連線通常是長連線,Producer和Consumer都是通過TCP連線到RabbitMQ Server的。

  Channel(通道) : AMQP通過通道(channels)來處理多連線,可以把通道理解成共享一個TCP連線的多個輕量化連線。

4.交換機型別介紹

  直連交換機(Direct):將訊息推送到binding key與該訊息的routing key相同的佇列。直連交換機要求Publisher和Consumer的路由關鍵字(routingKey)完全相同才會將訊息路由到繫結的佇列,直連交換機用來處理訊息的單播路由(unicast routing),但也可以進行多播。

  扇型交換機(funout exchange):扇形交換機會將訊息路由給繫結到它身上的所有佇列,而不理會繫結的路由鍵,所以扇型一般用來交換機處理訊息的廣播路由(broadcast routing)。

  主題交換機(Topic):主題交換機通過對訊息的路由鍵和佇列到交換機的繫結模式之間的匹配,將訊息路由給一個或多個佇列,屬於多播路。topic可以進行模糊匹配(如sql的模糊查詢),可以使用*星號和井號#這兩個萬用字元來進行模糊匹配,其中*(星號)可以代替一個單詞 # 號可以代替任意個單詞,但是需要注意的是topic交換機的路由鍵也不是可以隨意設定的,必須是由點隔開的一系列的識別符號組成,有效的路由鍵示例:“ new.user.id”,“ sys.mq”,“ orange.rabbitmq ”................,可以定義任意數量的識別符號,上限為255個位元組

  首部交換機(Header):首部交換機和扇形交換機都不需要路由鍵routingKey,交換機是通過Headers頭部來將訊息對映到佇列的,有點像HTTP的Headers,Hash結構中要求攜帶一個鍵“x-match”,這個鍵的Value可以是any或者all,這代表訊息攜帶的Hash是需要全部匹配(all),還是僅匹配一個鍵(any)就可以了,首部交換機的最大特點就是匹配規則不被限制為string,而是object。

5.佇列屬性

  queue : 佇列名稱,佇列在宣告(declare)後才能被使用。如果一個佇列尚不存在,宣告一個佇列會建立它。如果宣告的佇列已經存在,並且屬性完全相同,那麼此次宣告不會對原有佇列產生任何影響。如果宣告中的屬性與已存在佇列的屬性有差異,那麼將會丟擲一個406通道級異常。
  durable:佇列的宣告預設是存放到記憶體中的,稱為暫存佇列,訊息代理重啟會丟失。如果想重啟之後還存在就要使佇列持久化,儲存到Erlang自帶的Mnesia資料庫中,當rabbitmq重啟之後會讀取該資料庫。當durable設定為true時佇列持久化,但這並不意味著訊息持久化,當訊息代理重啟後訊息依舊會丟失。
  exclusive :是否排外的,有兩個作用,一:當連線關閉時connection.close()該佇列是否會自動刪除;二:該佇列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個佇列,沒有任何問題,如果是排外的,會對當前佇列加鎖,其他通道channel是不能訪問的。
  autoDelete :當最後一個消費者斷開連線之後佇列是否自動被刪除。
arguments :

  1. x-message-ttl(Time-To-Live):置佇列中的所有訊息的生存週期(統一為整個佇列的所有訊息設定生命週期), 也可以在釋出訊息的時候單獨為某個訊息指定剩餘生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,訊息會被從隊裡中刪除,注意是訊息被刪除,而不是佇列被刪除,

  2.x-expires:當佇列在指定的時間沒有被訪問就會被刪除。

  3.x-max-length: 限定佇列的訊息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如儲存最新的100條訊息。

  4.x-max-length-bytes:限定佇列最大佔用的空間大小, 一般受限於記憶體、磁碟的大小。

  5.x-dead-letter-exchange:當佇列訊息長度大於最大長度、或者過期的等,將從佇列中刪除的訊息推送到指定的交換機中去而不是丟棄掉。

  6.x-dead-letter-routing-key:將刪除的訊息推送到指定交換機的指定路由鍵的佇列中去。

  7.x-max-priority:優先順序佇列,宣告佇列時先定義最大優先順序值(定義最大值一般不要太大),在釋出訊息的時候指定該訊息的優先順序, 優先順序更高(數值更大的)的訊息先被消費。

  8.x-queue-mode=lazy:先將訊息儲存到磁碟上,不放在記憶體中,當消費者開始消費的時候才載入到記憶體中。

6.訊息確認

在消費者處理訊息的時候偶爾會失敗或者有時訊息代理會直接崩潰掉。有時會因為網路原因後者其他原因引起的各種問題,那麼為了保證訊息不會丟失保證訊息被正確處理,RabbitMQ提供了兩種訊息確認機制:

自動確認:當消費者連線上佇列,因為沒有指定消費者一次獲取訊息的條數,所以會把佇列中的所有訊息一下子推送到消費者端,當訊息從佇列被推出的時的那一刻就表示已經對訊息進行自動確認了,訊息就會從佇列中刪除。

===>channel.BasicConsume(queue: "NewsQueue", autoAck: true, consumer: consumer);

手動確認:

    consumer.Received += (m, item) =>
    {
      Console.WriteLine(Encoding.UTF8.GetString(item.Body.ToArray()));
      //手動確認訊息
      channel.BasicAck(item.DeliveryTag, false);
    };

每執行一次channel.BasicAck(item.DeliveryTag, false);;Unacked和Total就會減去1,直到兩個值都為0

訊息持久化:

  設定訊息持久化必須先設定佇列持久化,要不然佇列不持久化,訊息持久化,佇列都不存在了,訊息存在還有什麼意義。訊息持久化需要將交換機持久化、佇列持久化、訊息持久化,才能最終達到持久化的目的

佇列持久化:

  方法一:設定deliveryMode=2

    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
    properties.deliveryMode(2);
    // 設定訊息是否持久化,1: 非持久化 2:持久化
    channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
 方法二:設定BasicProperties為MessageProperties.PERSISTENT_TEXT_PLAIN
   channel.BasicPublish(exchange: "QueueExchange", routingKey: "queueKey", basicProperties:MessageProperties.PERSISTENT_TEXT_PLAIN, body: Encoding.UTF8.GetBytes(messages));

這是我寫的一個dome,不足之處,還請教給位大佬指點:

連結:https://pan.baidu.com/s/1ZP1_67Nuu1pvflS1PKK7Wg
提取碼:2180

後面還會寫一些其他交換機的dome,望各位大佬指點。