通過集群的方式解決基於MQTT協議的RabbitMQ消息收發
在完成了基於AMQP協議的RabbitMQ消息收發後,我們要繼續實現基於MQTT協議的RabbitMQ消息收發。
由於C#的RabbitMQ.Client包中只實現了基於AMQP協議的消息收發功能的封裝,所以要實現基於MQTT協議的收發,我們要下載新的包。
在NuGet的解決方案中,我們選擇了簡單實用的M2Mqtt。
關於M2Mqtt的資料,可以參考: https://m2mqtt.wordpress.com/ https://github.com/eclipse/paho.mqtt.m2mqtt
消費者代碼:
using System; using uPLibrary.Networking.M2Mqtt;View Codeusing uPLibrary.Networking.M2Mqtt.Messages; namespace MQTTDemo { class Client { static void Main() { // create client instance MqttClient client = new MqttClient("127.0.0.1"); // register to message received client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;string clientId = Guid.NewGuid().ToString(); client.Connect(clientId); client.Subscribe(new string[] { "test" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE }); } static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) {string msg = System.Text.Encoding.Default.GetString(e.Message); Console.WriteLine(msg); } } }
生產者代碼:
using System; using System.Text; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; namespace MQTTServer { class Server { static void Main() { // create client instance MqttClient client = new MqttClient("127.0.0.1"); string clientId = Guid.NewGuid().ToString(); client.Connect(clientId); client.Publish("test", Encoding.UTF8.GetBytes("hello"), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false); Console.WriteLine("Publish!!!"); Console.ReadKey(); client.Disconnect(); } } }View Code
消費者監聽的隊列名會基於產生的Guid進行前後封裝,“test”表示的是topic值,選擇QOS_LEVEL_AT_MOST_ONCE而不是QOS_LEVEL_EXACTLY_ONCE是因為測試發現QOS_LEVEL_EXACTLY_ONCE消息會被收到多次(我也不知道為啥)。
消費者監聽的隊列會在消費者程序結束後自動刪除,生產者不產生隊列。
在rabbitmq-plugins enable rabbitmq_mqtt之後,我們就可以愉快地通過MQTT收發消息了。
然而,我們發現只能通過127.0.0.1和localhost訪問RabbitMQ服務器,而本機IP訪問失敗。
查閱了大量資料後,我發現這是由於rabbitmq默認的config中有這麽一段文字,所以我們之能在localhost中訪問服務器。
%% The default "guest" user is only permitted to access the server
%% via a loopback interface (e.g. localhost).
%% {loopback_users, [<<"guest">>]},
所以我們取消了{loopback_users, []}的註釋
%% Uncomment the following line if you want to allow access to the
%% guest user from anywhere on the network.
%% {loopback_users, []},
值得註意的是,由於我們在config中僅僅取消了一行註釋,所以這段代碼是整個代碼塊的最後一行。於是我們應該將句末的逗號一同去掉。
然而,我發現怎麽更改默認啟動的rabbitmq對應的comfig文件,都無法成功地使用我更改後的config文件,察看了log發現用的是不存在的rabbitmq.conf文件。
修改成rabbitmq.conf後服務啟動失敗,所以我放棄了直接在默認啟動服務中更改。
由於之前配置過rabbitmq集群,所以我打算采用集群的方式解決問題。
操作可以參考https://www.cnblogs.com/lucifer1997/p/9324130.html,其中我將ClusterNode1改為了mqtt,同時在rabbitmq-mqtt.config中對{loopback_users, []}進行了更改。
如果要修改默認的mqtt用戶、密碼、虛擬用戶、交換機信息,可以參照http://www.rabbitmq.com/mqtt.html在rabbitmq-mqtt.config中進行修改。
在命令行操作之前先把原來開啟的rabbitmq_mqtt停用,避免兩個服務同時監聽1883端口導致報錯。 rabbitmq-plugins disable rabbitmq_mqtt
同時在操作了rabbitmq-plugins-mqtt enable rabbitmq_management之後執行rabbitmq-plugins-mqtt enable rabbitmq_mqtt。
如此就可以在集群後實現遠程MQTT收發,同時還可以實現AMQP與MQTT之間的收發。
通過集群的方式解決基於MQTT協議的RabbitMQ消息收發