C# 使用RabbitMQ訊息佇列
參考文章https://www.cnblogs.com/kiba/p/11703073.html和https://www.cnblogs.com/longlongogo/p/6489574.html
經過程式碼檢測兩份文章的程式碼部分不適用,只參考了RabbitMQ的介紹和安裝部分。
安裝版本是
rabbitmq-server-3.8.9.exe 地址https://www.rabbitmq.com/install-windows.html#installer
otp_win64_23.1.exe
廢話不多說,直入主題,用的是控制檯程式,其他平臺類似,安裝包是RabbitMQ.Client6.2.1版本
程式碼如下:
一、RabbitMQ訊息佇列傳送端的程式碼
1 using RabbitMQ.Client; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading; 7 using System.Threading.Tasks; 8 9 namespace MQDemo 10 { 11 class Program 12 { 13 static void Main(string[] args) 14 {15 for (int i = 1; i < 100; i++) 16 { 17 bool isTrue = SendMsg("first RabbitMQ message"+i, "firstQueue"); 18 string msg = isTrue ? "傳送成功" : "傳送失敗"; 19 Console.WriteLine(msg+i); 20 Thread.Sleep(500); 21 }22 Console.ReadKey(); 23 24 } 25 26 27 /// <summary> 28 /// RabbitMQ傳送訊息 29 /// </summary> 30 /// <param name="jsonstr">具體json格式的字串</param> 31 /// <param name="queuqname">具體入隊的佇列名稱</param> 32 /// <returns></returns> 33 public static bool SendMsg(string jsonstr, string queuqname) 34 { 35 try 36 { 37 //1.例項化連線工廠 38 var factory = new ConnectionFactory(); 39 factory.HostName = "localhost"; 40 factory.UserName = "guest"; 41 factory.Password = "guest"; 42 factory.AutomaticRecoveryEnabled = true;////設定埠後自動恢復連線屬性 43 //2. 建立連線 44 var connection = factory.CreateConnection(); 45 //3. 建立通道 46 var channel = connection.CreateModel(); 47 try 48 { 49 var queue_name = queuqname;//具體入隊的佇列名稱 50 bool durable = true;//佇列是否持久化 51 bool exclusive = false; 52 //設定 autoDeleted=true 的佇列,當沒有消費者之後,佇列會自動被刪除 53 bool autoDelete = false; 54 //4. 申明佇列 55 channel.QueueDeclare(queue_name, durable, exclusive, autoDelete, null); 56 57 //將訊息標記為永續性 - 將IBasicProperties.SetPersistent設定為true 58 var properties = channel.CreateBasicProperties(); 59 properties.Persistent = true; //持久化的訊息 60 61 string message = jsonstr; //傳遞的訊息內容 62 var body = Encoding.UTF8.GetBytes(message); 63 64 var exchange_name = ""; 65 var routingKey = queue_name;//routingKey=queue_name,則為對應佇列接收=queue_name 66 67 channel.BasicPublish(exchange_name, routingKey, properties, body); //開始傳遞(指定basicProperties) 68 69 return true; 70 } 71 catch (Exception ex) 72 { 73 Console.WriteLine("RabbitMQ 傳送資料異常:" + ex.Message); 74 //PubTool.ConnError("RabbitMQ", "RunLog", "傳送資料異常:" + ex.Message); 75 } 76 finally 77 { 78 connection.Close(); 79 channel.Close(); 80 } 81 } 82 catch (Exception ex) 83 { 84 Console.WriteLine("RabbitMQ 外層呼叫傳送方法,發生異常:" + ex.Message); 85 //PubTool.ConnError("RabbitMQ", "RunLog", "外層呼叫傳送方法,發生異常:" + ex.Message); 86 } 87 return false; 88 } 89 } 90 }
執行結果如下:
一、RabbitMQ訊息佇列接收端的程式碼:
1 using RabbitMQ.Client; 2 using RabbitMQ.Client.Events; 3 using System; 4 using System.Collections.Generic; 5 using System.IO; 6 using System.Linq; 7 using System.Text; 8 using System.Threading; 9 using System.Threading.Tasks; 10 11 namespace RabbitMQReceived 12 { 13 class Program 14 { 15 static void Main(string[] args) 16 { 17 string queuqname = "firstQueue"; 18 ushort limitnum = 3; 19 20 try 21 { 22 #region 構建訊息佇列 23 //1.例項化連線工廠 24 var factory = new RabbitMQ.Client.ConnectionFactory(); 25 factory.HostName = "localhost"; 26 factory.UserName = "guest"; 27 factory.Password = "guest"; 28 factory.AutomaticRecoveryEnabled = true; 29 //2. 建立連線 30 var connection = factory.CreateConnection(); 31 //3. 建立通道 32 var channel = connection.CreateModel(); 33 34 var queue_name = queuqname;//專案下游上傳的佇列資訊 35 bool durable = true;//佇列是否持久化 36 bool exclusive = false; 37 //設定 autoDeleted=true 的佇列,當沒有消費者之後,佇列會自動被刪除 38 bool autoDelete = false; 39 //4. 申明佇列 40 channel.QueueDeclare(queue_name, durable, exclusive, autoDelete, null); 41 //5. 構造消費者例項 42 var consumer = new RabbitMQ.Client.Events.EventingBasicConsumer(channel); 43 bool autoAck = false; 44 //autoAck:true;自動進行訊息確認,當消費端接收到訊息後,就自動傳送ack訊號,不管訊息是否正確處理完畢 45 //autoAck:false;關閉自動訊息確認,通過呼叫BasicAck方法手動進行訊息確認 46 //6. 繫結訊息接收後的事件委託 47 48 //8. 啟動消費者 49 //設定prefetchCount : 3 來告知RabbitMQ,在未收到消費端的N條訊息確認時,不再分發訊息,也就確保了當消費端處於忙碌狀態時 50 channel.BasicQos(0, limitnum, false); 51 52 channel.BasicConsume(queue_name, autoAck, consumer); 53 54 #endregion 55 56 #region 佇列-接收訊息的處理方法 57 58 consumer.Received += (model, ea) => 59 { 60 try 61 { 62 //var body = ea.Body.ToArray(); 63 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 64 //獲取訊息後進行操作,do something 65 bool flag = false; 66 if (!string.IsNullOrEmpty(message)) 67 { 68 try 69 { 70 //做其他儲存或處理操作 71 //File.WriteAllText(@"C:\Users\Administrator\Desktop\333.txt", message, Encoding.UTF8); 72 Console.WriteLine("接收訊息:" + message); 73 flag = true; 74 75 76 } 77 catch (Exception ex) 78 { 79 } 80 } 81 else 82 { 83 flag = true; 84 } 85 if (flag) 86 { 87 //操作完畢,則手動確認訊息可刪除 88 // 7. 傳送訊息確認訊號(手動訊息確認) 89 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 90 } 91 } 92 catch (Exception ex) 93 { 94 } 95 }; 96 #endregion 97 98 } 99 catch (Exception ex) 100 { 101 102 } 103 //finally 104 //{ 105 // connection.Close();//不能關,關了就停止接收訊息了 106 // channel.Close(); 107 //} 108 109 110 Console.ReadKey(); 111 } 112 113 static void Methed(object model, BasicDeliverEventArgs ea, IModel channel) 114 { 115 try 116 { 117 //var body = ea.Body.ToArray(); 118 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 119 //獲取訊息後進行操作,do something 120 bool flag = false; 121 if (!string.IsNullOrEmpty(message)) 122 { 123 try 124 { 125 //做其他儲存或處理操作 126 File.WriteAllText(@"C:\Users\Administrator\Desktop\333.txt", message, Encoding.UTF8); 127 Console.WriteLine("ok :" + message); 128 flag = true; 129 130 131 } 132 catch (Exception ex) 133 { 134 } 135 } 136 else 137 { 138 flag = true; 139 } 140 if (flag) 141 { 142 //操作完畢,則手動確認訊息可刪除 143 // 7. 傳送訊息確認訊號(手動訊息確認) 144 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 145 } 146 } 147 catch (Exception ex) 148 { 149 } 150 } 151 152 } 153 154 155 }
執行結果如下:
下面是安裝教程及介紹
關於訊息佇列
其實訊息佇列沒有那麼神祕,我們這樣想一下,使用者訪問網站,最終是要將資料以HTTP的協議的方式,通過網路傳輸到主機的某個埠上的。
那麼,接收資料的方式是什麼呢?自然是埠監聽啦。
那訊息佇列是什麼就很好解釋了?
它就是埠監聽,接到資料後,將資料排列起來。
那這件事,我們不用中介軟體能做嗎?
當然能做啦,寫個TCP/UDP/Socket的軟體就可以做啦。
舉個簡單的例子,如下圖:
既然自己可以做訊息佇列,那為什麼要用RabbitMQ?
因為,RabbitMQ成熟的開源中介軟體,可靠性有保證,bug少,效能也非常好。
而C#程式碼預設是使用託管記憶體的,所以,想寫出媲美RabbitMQ效能的訊息佇列,就必須離開我們常用的託管記憶體,使用非託管記憶體,但這個代價就太大了;而且最終能否達到RabbitMQ的效能水平還是個未知數。
還有就是RabbitMQ除了基礎的訊息佇列管理,還有很多很強大的額外功能,而自己開發訊息佇列,很難如此盡善盡美。
----------------------------------------------------------------------------------------------------
我們還會發現,在訊息佇列裡有很多概念,什麼訊息匯流排啊,什麼工作佇列啊等等。
要怎麼理解這些概念呢?
很簡單,不要去理解。這些概念其實是人家程式碼架構的模式,不要去理解他們,【記】就完了,人家的中介軟體就是按照這個模式工作的。
比如,我寫了一個接收訊息的總控制器,然後我為他命名為匯流排,那這個控制器就是匯流排,沒有理由,這就是定義。
準備工作
首先,我們訪問官網【https://www.rabbitmq.com/】,點選Get Started。
然後,網站會自動跳轉到當前首頁Get Started的錨點位置,如下圖:
Get Started錨點:
然後我們點選DownLoad+Installation,進入到下載介面。
在下載頁面中,我們找到安裝指南,然後在點選官網推薦的Windows系統的安裝包,如下圖:
現在,我們進入了Windows安裝指南介面了。
首先,我們看一下預覽資訊,如下圖:
在預覽裡,我們得知,安裝RabbitMQ有兩種方法,一種是使用Chocolatey安裝,一種是使用官方安裝包安裝。
Chocolatey是什麼呢?隨手百度一下,原來他是一個軟體包管理工具,也就是說,Chocolatey是類似於Nuget的一種工具。
由於Chocolatey的使用,我不是很熟悉,所以,這裡選擇使用官方安裝包安裝。
點選【Using the official installer】,我們進入了【Using the official installer】對應的錨點,如下圖。
在【Using the official installer】段落裡找到有推薦標誌的安裝包,然後下載。
下載完成後,我們可以得到這樣一個安裝包,如下圖:
除了下載安裝包,我們還會發現,在【Using the official installer】段落裡,有提醒我們,RabbitMQ是有依賴的,依賴一個Erlang語言的框架(類似於C#語言的NetFramework)。
我們可以發現,在依賴的段落裡,官網非常坑的給出了三個連結網址,如下:
supported version of Erlang:https://www.rabbitmq.com/which-erlang.html
Windows installer:https://www.erlang.org/downloads
Erlang Solutions:https://www.erlang-solutions.com/resources/download.html
因為,我們是無法通過文字描述來判斷,哪一個是真的依賴框架的下載地址,所以只好每個都點選進去看看。。。
開啟網址後發現,在後兩個網址中都可以找到框架下載地址,但第二個地址明顯更友好一點,所以我們在第二個網址內下載Erlang的框架。
下載完成得到如下圖檔案:
PS:這裡下載的是OTP的22.1的版本,我的理解是Erlang等於C#語言,而OTP等於NetFramework。
安裝Erlang\OTP
首先,我們執行otp_win64_22.1.exe,安裝依賴框架Erlang\OTP。
安裝完成後,設定環境變數如下:
然後執行CMD,輸入erl,測試安裝是否成功,如下圖:
安裝成功。
安裝rabbitmq-server
安裝完依賴後,我們接著安裝rabbitmq-server-3.8.0.exe。
【rabbitmq-server-3.8.0.exe】?從這個檔名上,我們發現了一個問題,那就是,我們即將安裝的RabbitMQ,是一個服務端啊。
什麼?服務端?難道還有客戶端???
其實這也很好理解,想一下最開始我舉的那個例子,訊息佇列是需要一個監聽埠的服務端的,然後客戶端向這個服務端傳送請求。
這樣是不是就很好的理解RabbitMQ了呢:)
----------------------------------------------------------------------------------------------------
安裝完RabbitMQ服務端後,我們還是啟動CMD,用命令列來檢視下安裝狀態。
首先輸入下面的命令,將路徑定位到RabbitMQ的路徑下:
【CD /D C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.0\sbin】
然後輸入rabbitmqctl status檢視狀態。
啟動管理工具的命令列:rabbitmq-plugins enable rabbitmq_management。
啟動成功後,在瀏覽器輸入地址http://127.0.0.1:15672/,進入管理頁面,賬戶密碼都是guest。
RabbitMQ還有很多常用命令,大家可以自行百度。
----------------------------------------------------------------------------------------------------
到此,RabbitMQ服務端的環境配置好了,正常情況,這些配置應該在伺服器進行,但我為了測試方便,就把服務端也安裝在本機了,因此我下面呼叫RabbitMQ時,連線的主機IP都是localhost。