1. 程式人生 > 實用技巧 >C# 使用RabbitMQ訊息佇列

C# 使用RabbitMQ訊息佇列

參考文章https://www.cnblogs.com/kiba/p/11703073.htmlhttps://www.cnblogs.com/longlongogo/p/6489574.html

經過程式碼檢測兩份文章的程式碼部分不適用,只參考了RabbitMQ的介紹和安裝部分。

安裝版本是

rabbitmq-server-3.8.9.exe 地址https://www.rabbitmq.com/install-windows.html#installer

otp_win64_23.1.exe

廢話不多說,直入主題,用的是控制檯程式,其他平臺類似,安裝包是RabbitMQ.Client6.2.1版本

程式碼如下:

一、RabbitMQ訊息佇列傳送端的程式碼

 1 using RabbitMQ.Client;
 2 using System;
 3 using System.Collections.Generic;
 4 using System.Linq;
 5 using System.Text;
 6 using System.Threading;
 7 using System.Threading.Tasks;
 8 
 9 namespace MQDemo
10 {
11     class Program
12     {
13         static void Main(string[] args)
14         {
15 for (int i = 1; i < 100; i++) 16 { 17 bool isTrue = SendMsg("first RabbitMQ message"+i, "firstQueue"); 18 string msg = isTrue ? "傳送成功" : "傳送失敗"; 19 Console.WriteLine(msg+i); 20 Thread.Sleep(500); 21 }
22 Console.ReadKey(); 23 24 } 25 26 27 /// <summary> 28 /// RabbitMQ傳送訊息 29 /// </summary> 30 /// <param name="jsonstr">具體json格式的字串</param> 31 /// <param name="queuqname">具體入隊的佇列名稱</param> 32 /// <returns></returns> 33 public static bool SendMsg(string jsonstr, string queuqname) 34 { 35 try 36 { 37 //1.例項化連線工廠 38 var factory = new ConnectionFactory(); 39 factory.HostName = "localhost"; 40 factory.UserName = "guest"; 41 factory.Password = "guest"; 42 factory.AutomaticRecoveryEnabled = true;////設定埠後自動恢復連線屬性 43 //2. 建立連線 44 var connection = factory.CreateConnection(); 45 //3. 建立通道 46 var channel = connection.CreateModel(); 47 try 48 { 49 var queue_name = queuqname;//具體入隊的佇列名稱 50 bool durable = true;//佇列是否持久化 51 bool exclusive = false; 52 //設定 autoDeleted=true 的佇列,當沒有消費者之後,佇列會自動被刪除 53 bool autoDelete = false; 54 //4. 申明佇列 55 channel.QueueDeclare(queue_name, durable, exclusive, autoDelete, null); 56 57 //將訊息標記為永續性 - 將IBasicProperties.SetPersistent設定為true 58 var properties = channel.CreateBasicProperties(); 59 properties.Persistent = true; //持久化的訊息 60 61 string message = jsonstr; //傳遞的訊息內容 62 var body = Encoding.UTF8.GetBytes(message); 63 64 var exchange_name = ""; 65 var routingKey = queue_name;//routingKey=queue_name,則為對應佇列接收=queue_name 66 67 channel.BasicPublish(exchange_name, routingKey, properties, body); //開始傳遞(指定basicProperties) 68 69 return true; 70 } 71 catch (Exception ex) 72 { 73 Console.WriteLine("RabbitMQ 傳送資料異常:" + ex.Message); 74 //PubTool.ConnError("RabbitMQ", "RunLog", "傳送資料異常:" + ex.Message); 75 } 76 finally 77 { 78 connection.Close(); 79 channel.Close(); 80 } 81 } 82 catch (Exception ex) 83 { 84 Console.WriteLine("RabbitMQ 外層呼叫傳送方法,發生異常:" + ex.Message); 85 //PubTool.ConnError("RabbitMQ", "RunLog", "外層呼叫傳送方法,發生異常:" + ex.Message); 86 } 87 return false; 88 } 89 } 90 }

執行結果如下:

一、RabbitMQ訊息佇列接收端的程式碼:

  1 using RabbitMQ.Client;
  2 using RabbitMQ.Client.Events;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.IO;
  6 using System.Linq;
  7 using System.Text;
  8 using System.Threading;
  9 using System.Threading.Tasks;
 10 
 11 namespace RabbitMQReceived
 12 {
 13     class Program
 14     {
 15         static void Main(string[] args)
 16         {
 17             string queuqname = "firstQueue";
 18             ushort limitnum = 3;
 19 
 20             try
 21             {
 22                 #region 構建訊息佇列
 23                 //1.例項化連線工廠
 24                 var factory = new RabbitMQ.Client.ConnectionFactory();
 25                 factory.HostName = "localhost";
 26                 factory.UserName = "guest";
 27                 factory.Password = "guest";
 28                 factory.AutomaticRecoveryEnabled = true;
 29                 //2. 建立連線 
 30                 var connection = factory.CreateConnection();
 31                 //3. 建立通道
 32                 var channel = connection.CreateModel();
 33 
 34                 var queue_name = queuqname;//專案下游上傳的佇列資訊
 35                 bool durable = true;//佇列是否持久化
 36                 bool exclusive = false;
 37                 //設定 autoDeleted=true 的佇列,當沒有消費者之後,佇列會自動被刪除
 38                 bool autoDelete = false;
 39                 //4. 申明佇列
 40                 channel.QueueDeclare(queue_name, durable, exclusive, autoDelete, null);
 41                 //5. 構造消費者例項
 42                 var consumer = new RabbitMQ.Client.Events.EventingBasicConsumer(channel);
 43                 bool autoAck = false;
 44                 //autoAck:true;自動進行訊息確認,當消費端接收到訊息後,就自動傳送ack訊號,不管訊息是否正確處理完畢
 45                 //autoAck:false;關閉自動訊息確認,通過呼叫BasicAck方法手動進行訊息確認 
 46                 //6. 繫結訊息接收後的事件委託
 47 
 48                 //8. 啟動消費者
 49                 //設定prefetchCount : 3 來告知RabbitMQ,在未收到消費端的N條訊息確認時,不再分發訊息,也就確保了當消費端處於忙碌狀態時
 50                 channel.BasicQos(0, limitnum, false);
 51 
 52                 channel.BasicConsume(queue_name, autoAck, consumer);
 53 
 54                 #endregion
 55 
 56                 #region 佇列-接收訊息的處理方法
 57 
 58                 consumer.Received += (model, ea) =>
 59                 {
 60                     try
 61                     {
 62                         //var body = ea.Body.ToArray();
 63                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
 64                         //獲取訊息後進行操作,do something
 65                         bool flag = false;
 66                         if (!string.IsNullOrEmpty(message))
 67                         {
 68                             try
 69                             {
 70                                 //做其他儲存或處理操作
 71                                 //File.WriteAllText(@"C:\Users\Administrator\Desktop\333.txt", message, Encoding.UTF8);
 72                                 Console.WriteLine("接收訊息:" + message);
 73                                 flag = true;
 74 
 75 
 76                             }
 77                             catch (Exception ex)
 78                             {
 79                             }
 80                         }
 81                         else
 82                         {
 83                             flag = true;
 84                         }
 85                         if (flag)
 86                         {
 87                             //操作完畢,則手動確認訊息可刪除
 88                             // 7. 傳送訊息確認訊號(手動訊息確認)
 89                             channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 90                         }
 91                     }
 92                     catch (Exception ex)
 93                     {
 94                     }
 95                 };
 96                 #endregion
 97 
 98             }
 99             catch (Exception ex)
100             { 
101             
102             }
103             //finally
104             //{
105             //    connection.Close();//不能關,關了就停止接收訊息了
106             //    channel.Close();
107             //}
108            
109 
110             Console.ReadKey();
111         }
112 
113         static void Methed(object model, BasicDeliverEventArgs ea, IModel channel)
114         {
115             try
116             {
117                 //var body = ea.Body.ToArray();
118                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());
119                 //獲取訊息後進行操作,do something
120                 bool flag = false;
121                 if (!string.IsNullOrEmpty(message))
122                 {
123                     try
124                     {
125                         //做其他儲存或處理操作
126                         File.WriteAllText(@"C:\Users\Administrator\Desktop\333.txt", message, Encoding.UTF8);
127                         Console.WriteLine("ok :" + message);
128                         flag = true;
129 
130 
131                     }
132                     catch (Exception ex)
133                     {
134                     }
135                 }
136                 else
137                 {
138                     flag = true;
139                 }
140                 if (flag)
141                 {
142                     //操作完畢,則手動確認訊息可刪除
143                     // 7. 傳送訊息確認訊號(手動訊息確認)
144                     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
145                 }
146             }
147             catch (Exception ex)
148             {
149             }
150         }
151        
152     }
153 
154      
155 }

執行結果如下:

下面是安裝教程及介紹

關於訊息佇列

其實訊息佇列沒有那麼神祕,我們這樣想一下,使用者訪問網站,最終是要將資料以HTTP的協議的方式,通過網路傳輸到主機的某個埠上的。

那麼,接收資料的方式是什麼呢?自然是埠監聽啦。

那訊息佇列是什麼就很好解釋了?

它就是埠監聽,接到資料後,將資料排列起來。

那這件事,我們不用中介軟體能做嗎?

當然能做啦,寫個TCP/UDP/Socket的軟體就可以做啦。

舉個簡單的例子,如下圖:

既然自己可以做訊息佇列,那為什麼要用RabbitMQ?

因為,RabbitMQ成熟的開源中介軟體,可靠性有保證,bug少,效能也非常好。

而C#程式碼預設是使用託管記憶體的,所以,想寫出媲美RabbitMQ效能的訊息佇列,就必須離開我們常用的託管記憶體,使用非託管記憶體,但這個代價就太大了;而且最終能否達到RabbitMQ的效能水平還是個未知數。

還有就是RabbitMQ除了基礎的訊息佇列管理,還有很多很強大的額外功能,而自己開發訊息佇列,很難如此盡善盡美。

----------------------------------------------------------------------------------------------------

我們還會發現,在訊息佇列裡有很多概念,什麼訊息匯流排啊,什麼工作佇列啊等等。

要怎麼理解這些概念呢?

很簡單,不要去理解。這些概念其實是人家程式碼架構的模式,不要去理解他們,【記】就完了,人家的中介軟體就是按照這個模式工作的。

比如,我寫了一個接收訊息的總控制器,然後我為他命名為匯流排,那這個控制器就是匯流排,沒有理由,這就是定義。

準備工作

首先,我們訪問官網【https://www.rabbitmq.com/】,點選Get Started。

然後,網站會自動跳轉到當前首頁Get Started的錨點位置,如下圖:

Get Started錨點:

然後我們點選DownLoad+Installation,進入到下載介面。

在下載頁面中,我們找到安裝指南,然後在點選官網推薦的Windows系統的安裝包,如下圖:

現在,我們進入了Windows安裝指南介面了。

首先,我們看一下預覽資訊,如下圖:

在預覽裡,我們得知,安裝RabbitMQ有兩種方法,一種是使用Chocolatey安裝,一種是使用官方安裝包安裝。

Chocolatey是什麼呢?隨手百度一下,原來他是一個軟體包管理工具,也就是說,Chocolatey是類似於Nuget的一種工具。

由於Chocolatey的使用,我不是很熟悉,所以,這裡選擇使用官方安裝包安裝。

點選【Using the official installer】,我們進入了【Using the official installer】對應的錨點,如下圖。

在【Using the official installer】段落裡找到有推薦標誌的安裝包,然後下載。

下載完成後,我們可以得到這樣一個安裝包,如下圖:

除了下載安裝包,我們還會發現,在【Using the official installer】段落裡,有提醒我們,RabbitMQ是有依賴的,依賴一個Erlang語言的框架(類似於C#語言的NetFramework)。

我們可以發現,在依賴的段落裡,官網非常坑的給出了三個連結網址,如下:

supported version of Erlang:https://www.rabbitmq.com/which-erlang.html

Windows installer:https://www.erlang.org/downloads

Erlang Solutions:https://www.erlang-solutions.com/resources/download.html

因為,我們是無法通過文字描述來判斷,哪一個是真的依賴框架的下載地址,所以只好每個都點選進去看看。。。

開啟網址後發現,在後兩個網址中都可以找到框架下載地址,但第二個地址明顯更友好一點,所以我們在第二個網址內下載Erlang的框架。

下載完成得到如下圖檔案:

PS:這裡下載的是OTP的22.1的版本,我的理解是Erlang等於C#語言,而OTP等於NetFramework。

安裝Erlang\OTP

首先,我們執行otp_win64_22.1.exe,安裝依賴框架Erlang\OTP。

安裝完成後,設定環境變數如下:

然後執行CMD,輸入erl,測試安裝是否成功,如下圖:

安裝成功。

安裝rabbitmq-server

安裝完依賴後,我們接著安裝rabbitmq-server-3.8.0.exe。

【rabbitmq-server-3.8.0.exe】?從這個檔名上,我們發現了一個問題,那就是,我們即將安裝的RabbitMQ,是一個服務端啊。

什麼?服務端?難道還有客戶端???

其實這也很好理解,想一下最開始我舉的那個例子,訊息佇列是需要一個監聽埠的服務端的,然後客戶端向這個服務端傳送請求。

這樣是不是就很好的理解RabbitMQ了呢:)

----------------------------------------------------------------------------------------------------

安裝完RabbitMQ服務端後,我們還是啟動CMD,用命令列來檢視下安裝狀態。

首先輸入下面的命令,將路徑定位到RabbitMQ的路徑下:

【CD /D C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.0\sbin】

然後輸入rabbitmqctl status檢視狀態。

啟動管理工具的命令列:rabbitmq-plugins enable rabbitmq_management。

啟動成功後,在瀏覽器輸入地址http://127.0.0.1:15672/,進入管理頁面,賬戶密碼都是guest。

RabbitMQ還有很多常用命令,大家可以自行百度。

----------------------------------------------------------------------------------------------------

到此,RabbitMQ服務端的環境配置好了,正常情況,這些配置應該在伺服器進行,但我為了測試方便,就把服務端也安裝在本機了,因此我下面呼叫RabbitMQ時,連線的主機IP都是localhost。