聊聊RabbitMQ那一些事兒之一基礎應用
聊聊RabbitMQ那一些事兒之一基礎應用
Hi,各位熱愛技術的小夥伴您們好,今年的疫情害人啊,真心祝願您和您的家人大家都平平安安,健健康康。年前到現在一直沒有總結點東西,寫點東西,不然久了自己感覺自己都要被廢啦。這個週末花了一些時間來梳理了一下RabbitMQ的相關知識點。先來一個基礎篇,先用起來。我也是一個邊學習邊梳理的過程,如果有什麼梳理的不妥之處,多多指點,相互學習,謝謝!
在使用前,我們首先第一件事情就是環境搭建。至於RabbitMQ的環境搭建,我就不在此囉嗦了,網上一搜一大堆,還沒有搭建環境的小夥伴,可以網上找度娘哈,嘿嘿。
一、什麼是MQ
MQ簡單的說就是佇列,佇列的特性就是先進先出。我們其實可以把佇列理解為一個訊息管道,通過訊息管道實現訊息傳遞。最終達到不同的程序間、不同服務間的通訊需要。
在一個程式中,我們 可以通過MQ實現不同程序間的通訊。在不同程式/服務間,我們同樣可以通過MQ來實現相互通訊,這也是本文的重點,這個時候就該今天的主角登場了。
二、RabbitMQ介紹
RabbitMQ是一個開源的,在AMQP基礎完整的,可複用的企業訊息系統。我個人的簡單的理解就是,實現訊息的接收、儲存、管理、分發。在作業系統支援上,支援主流的作業系統(Linux、Windows);在開發語言介面支援上,支援所有的主流開發語言;在效能上,支援訊息持久化、叢集化、高併發等等。
三、RabbitMQ關鍵詞介紹
Broker(Server):接受客戶端連線,實現AMQP訊息佇列和路由功能的程序,我們可以把Broker叫做RabbitMQ伺服器。
Virtual Host:一個虛擬概念,其實簡單的理解你可以認為是在邏輯上對MQ進行分割槽隔離,這樣避免不同業務的MQ直接交叉感染。一個Virtual Host裡面可以有若干個Exchange和Queue,主要用於許可權控制,隔離應用。如應用程式A使用VhostA,應用程式B使用VhostB,那麼我們在VhostA中只存放應用程式A的exchange,queue和訊息,應用程式A的使用者只能訪問VhostA,不能訪問VhostB中的資料。
Exchange:接受生產者傳送的訊息,並根據Binding規則將訊息路由給伺服器中的佇列。ExchangeType決定了Exchange路由訊息的行為,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四種,不同型別的Exchange路由規則是不一樣的(這些以後會詳細介紹)。
Queue:訊息佇列,用於儲存還未被消費者消費的訊息,佇列是先進先出的,預設情況下先儲存的訊息先被處理。
Message:就是訊息,由Header和Body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先順序是多少等,Body是真正傳輸的資料,內容格式為byte[]。
Connection:連線,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連線。
Channel:通道,僅僅建立了客戶端到Broker之間的連線Connection後,客戶端還是不能傳送訊息的。需要在Connection的基礎上建立Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令,一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連線的建立和釋放都是十分昂貴的。
四、RabbitMQ三大角色介紹
通過上面的一些簡單介紹,我相信你對MQ有了一個初步的印象。也許你會雲裡霧裡的,到底是怎麼執行起來的啊,來一個實際點的。哈哈,不急,下面馬上進入RabbitMQ跑起來階段。其實要跑起來,我們還要簡單介紹一下RabbitMQ重要的三個角色:生產者、伺服器、消費者。
生產者:也就是訊息生產方,通過RabbitMQ提高的API,將訊息推送到RabbitMQ伺服器。
伺服器:RabbitMQ的服務中心,接收生產者生產的訊息,並根據分發規則,將訊息推送到對應的消費者。
消費者:顧名思義,就是訊息的最終接收處理者。
這樣一來,我相信大家腦海裡面已經有一個畫面了,生產者--生成訊息-->伺服器--轉發-->消費者(最終處理訊息)。這就是一個訊息的整體流程和生命週期。
五、RabbitMQ跑起來
通過上面的介紹,我們應該知道MQ的簡單的訊息互動的流程。有了這個基礎,下面我們就分類來介紹一下三大角色的資料交付方式。整體上來說,資料互動方式上有以下5種方式(5種工作模式),在網上找了一張圖,很方便的供大家參考。
其實通過上面的圖,我們會發現,前兩種情況,消費者和生成者之間都是直接通過連線,後面三種情況,消費者和生產者直接有一層交換機(Exchange)。這樣一來,我們可以從整體上分為兩個大類:其一、訊息直推佇列;其二、訊息推送給交換機,交換機根據路由規則轉發至佇列。
其實在實際的工作中,第一大類,我們是不會使用到的,都是採用的第二大類來實現實際的專案開發需求。但是第一大類,能夠很好的將我們先領我們入門,先簡單的把程式跑起來。由於時間原因,今天我們也就先實現第一大類的兩種情況,第二大類的,明後天在專門的文章來詳細介紹。
簡單模式:
簡單模式就是隻有一個生產者,一個消費者。這個很簡單,下面用一個實際例子來說明。直接貼程式碼:
生產者程式碼:
/// <summary> /// 訊息生成者 /// </summary> public class Program { static void Main(string[] args) { // rabbitMQ連結物件 var factory = new ConnectionFactory(); // RabbitMQ服務在本地執行 factory.HostName = "192.168.1.1"; // RabbitMQ服務埠 factory.Port = 5672; // 使用者名稱 factory.UserName = "guest"; // 密碼 factory.Password = "guest"; // 虛擬主機名稱 factory.VirtualHost = "/"; // 佇列名稱 string queueName = "hello"; // 建立連結 using (var connection = factory.CreateConnection()) { // 建立通道 using (var channel = connection.CreateModel()) { // 建立一個名稱為hello的訊息佇列--當然一步也可以通過RabbitMQ管理後臺新增 // 當已經存在該佇列時,不會重複新增,但是如果已存在的佇列和新建的佇列存在屬性差異時,會建立失敗,會拋異常,所以在實際使用時,如果要通過程式建立佇列,最好要捕捉異常,避免因為這樣的問題而導致程式崩潰。 channel.QueueDeclare(queueName, false, false, false, null); Console.WriteLine("我是生成者"); while (true) { Console.WriteLine("請輸入你要傳送的訊息,並按Enter鍵結束"); // 接收使用者輸入的訊息 string message = Console.ReadLine(); // 訊息編碼 var body = Encoding.UTF8.GetBytes(message); // 向訊息伺服器推送訊息 channel.BasicPublish("", queueName, null, body); Console.WriteLine($"已傳送 {System.DateTime.Now.ToString("HH:mm:ss")}: {message}"); } } } } }
消費者程式碼:
/// <summary> /// 訊息消費者 /// </summary> public class Program { static void Main(string[] args) { // rabbitMQ連結物件 var factory = new ConnectionFactory(); // RabbitMQ服務在本地執行 factory.HostName = "192.168.1.1"; // RabbitMQ服務埠 factory.Port = 5672; // 使用者名稱 factory.UserName = "guest"; // 密碼 factory.Password = "guest"; // 虛擬主機名稱 factory.VirtualHost = "/"; // 佇列名稱 string queueName = "hello"; // 建立連結 using (var connection = factory.CreateConnection()) { // 建立通道 using (var channel = connection.CreateModel()) { // 建立一個名稱為hello的訊息佇列--當然一步也可以通過RabbitMQ管理後臺新增 // 當已經存在該佇列時,不會重複新增,但是如果已存在的佇列和新建的佇列存在屬性差異時,會建立失敗,會拋異常,所以在實際使用時,如果要通過程式建立佇列,最好要捕捉異常,避免因為這樣的問題而導致程式崩潰。 channel.QueueDeclare(queueName, false, false, false, null); Console.WriteLine("我是消費者"); // 建立一個消費者 var consumer = new EventingBasicConsumer(channel); // 訂閱對應的訊息 autoAck:是否自動確認 channel.BasicConsume(queueName, autoAck:false, consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"已接收 {System.DateTime.Now.ToString("HH:mm:ss")}: {message}"); // 為了模擬推送過程,在此程式休息1分鐘 Thread.Sleep(6000); // 確認消費 channel.BasicAck(ea.DeliveryTag, false); }; Console.ReadLine(); } } } }
執行結果:
通過實際的執行結果圖,我們很清楚的知道,生產者的訊息發生順序,和消費者消費的順序是一直的,這也就MQ的基本原理所在。
上面介紹了簡單模式,下面我在來介紹一下比簡單模式複雜一點的工作模式。
工作模式:
我理解的簡單模式,只是帶我們入門,讓我們明白MQ的執行效果是咋樣的。但是在實際工作中,不可能只會有一個消費者,在實際的生產環境中生產者、消費者都可能會有多個存在,這也就是我們說的工作模式。那麼,有多個生成的者的時候,不同的生產者之間又是怎麼來消費訊息的呢?下面我們先通過實踐的例子來說明:
具體的程式碼和上面的程式碼是一樣的,我們可以直接開兩個消費者就可以實現資料模擬,直接看執行結果:
同上面的實際執行結果我們可以簡單的得出以下結論:
當一個佇列有多個消費者時,在生成的實時訊息時,訊息佇列伺服器會輪詢的均勻的分發給每一個消費者。
哈哈哈,注意了,上面的結論我說的是實時訊息哦,這裡面就包含了一個坑,在實際的使用過程中要特別注意。那就是歷史訊息處理上,在實際專案使用過程中,我們經常會遇到,當消費者開啟時,佇列中已經有很多訊息待消費,這個時候又該如何保證多個消費均勻分配訊息呢?避免忙綠的消費者累死現象。其實很簡單,只需在消費端加上如下一個配置即可:
// 通過Qos設定每次接收訊息的條數 // 三個引數說明 // prefetchSize:為預取的長度,一般設定為0即可,表示長度不限 // prefetchCount:表示預取的條數,即傳送的最大訊息條數 // global表示是否在Connection中全域性設定,true表示Connetion下的所有channel都設定為這個配置。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
上面的配置中,最關鍵的一個引數就是prefetchCount,當我們設定為1時,就是能夠實現均勻的分發。下面分別對prefetchCount設定不同的值,來看看不同的效果:
例項一:將prefetchCount設定為10,並生成3條歷史訊息,然後同時開啟兩個消費者,看看3條訊息的分發消費情況:
通過圖,我們得出,3條歷史訊息全部推送給了一個消費者,這樣就導致了一個消費者累死,一個消費者閒的慌。
例項二:將prefetchCount設定為1,並生成4條歷史訊息,然後同時開啟兩個消費者,看看3條訊息的分發消費情況:
通過圖,我們得出,4條歷史訊息平均的分發給了兩個消費者,這也是我們想要的效果。
所以在實際工作中,一定要注意這一個細節,不然有可能導致在伺服器重啟時,有的伺服器直接卡死現象。
好了,時間不早了,今天就先寫到這,明天我們繼續分享後面的幾種模式。在分析完每一種模式後,我還好結合實際,封裝一個dll出來,供大家參考,到時候也會直接把原始碼提出來。歡迎大家關注,持續交流。疫情無情,我們學習不能停。加油吧,每一個小夥伴!
END
為了更高的交流,歡迎大家關注我的公眾號,掃描下面二維碼即可關注,謝謝: