在C#中使用消息隊列RabbitMQ
阿新 • • 發佈:2017-08-30
.com 運行 over .text ges 默認安裝 using mstr odi
參考文章:http://www.cnblogs.com/qy1141/p/4054135.html
開發環境&工具: VS2017 RabbitMq Erlang運行環境
先安裝Erlang運行環境然後再安裝RabbitMq
安裝和配置就不說了
默認安裝路徑:C:\Program Files\RabbitMQ Server,在rabbitmq_server-3.6.11\sbin文件夾下有bat文件
默認配置文件路徑: C:\Users\wangshibang\AppData\Roaming\RabbitMQ\rabbitmq.config
下面直接說代碼
客戶端Client把發送的消息儲存到RabbitMq中,服務器開啟的時候會從Rabbitmq中讀取儲存的消息
項目結構圖
Clinet端代碼(記得添加 RabbitMQ.Client的Nuget包)
1 using Newtonsoft.Json; 2 using RabbitMQ.Client; 3 using System; 4 using System.Text; 5 using RabbitMqLib; 6 7 namespace RabbitMqClient 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 {13 try 14 { 15 ConnectionFactory factory = new ConnectionFactory(); 16 factory.HostName = Constants.MqHost; 17 factory.Port = Constants.MqPort; 18 factory.UserName = Constants.MqPwd; 19 using(IConnection conn = factory.CreateConnection()) 20 { 21 using (IModel channel = conn.CreateModel()) 22 { 23 channel.QueueDeclare("MyFirstQueue", true, false, false, null); 24 while (true) 25 { 26 string customStr = Console.ReadLine(); 27 RequestMsg requestMsg = new RequestMsg 28 { 29 Name = $"Name_{customStr}", 30 Code = $"Code_{customStr}" 31 }; 32 string jsonStr = JsonConvert.SerializeObject(requestMsg); 33 byte[] bytes = Encoding.UTF8.GetBytes(jsonStr); 34 35 //設置消息持久化 36 IBasicProperties properties = channel.CreateBasicProperties(); 37 properties.DeliveryMode = 2; 38 channel.BasicPublish("", "MyFirstQueue", properties, bytes); 39 40 Console.WriteLine("消息已經發送:" + requestMsg.ToString()); 41 } 42 } 43 } 44 } 45 catch (Exception ex) 46 { 47 Console.WriteLine(ex.ToString()); 48 } 49 Console.ReadLine(); 50 } 51 } 52 }
Server端代碼(記得添加 RabbitMQ.Client Nuget包)
1 using Newtonsoft.Json; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using RabbitMqLib; 5 using System; 6 using System.Text; 7 8 namespace RabbitMqServer 9 { 10 class Program 11 { 12 static void Main(string[] args) 13 { 14 try 15 { 16 ConnectionFactory factory = new ConnectionFactory(); 17 factory.HostName = Constants.MqHost; 18 factory.Port = Constants.MqPort; 19 factory.UserName = Constants.MqUserName; 20 factory.Password = Constants.MqPwd; 21 using (IConnection conn = factory.CreateConnection()) 22 { 23 using (IModel channel = conn.CreateModel()) 24 { 25 //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 26 channel.QueueDeclare("MyFirstQueue", true, false, false, null); 27 //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 28 channel.BasicQos(0, 1, false); 29 Console.WriteLine("Listening..."); 30 //在隊列上定義一個消費者 31 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 32 //消費隊列,並設置應答模式為程序主動應答 33 channel.BasicConsume("MyFirstQueue", false, consumer); 34 35 while (true) 36 { 37 //阻塞函數,獲取隊列中的消息 38 BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 39 byte[] bytes = ea.Body; 40 string str = Encoding.UTF8.GetString(bytes); 41 RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str); 42 Console.WriteLine("HandleMsg:" + msg.ToString()); 43 //回復確認 44 channel.BasicAck(ea.DeliveryTag, false); 45 } 46 } 47 } 48 } 49 catch (Exception ex) 50 { 51 Console.WriteLine(ex.ToString()); 52 } 53 Console.ReadLine(); 54 } 55 } 56 }
客戶端顯示效果
服務端顯示效果
最後還有類庫的兩個類
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace RabbitMqLib 8 { 9 public static class Constants 10 { 11 public static string MqHost { get; } = "localhost"; 12 13 public static int MqPort { get; } = 5672; 14 15 public static string MqPwd { get; } = "guest"; 16 17 public static string MqUserName { get; } = "guest"; 18 } 19 }
1 namespace RabbitMqLib 2 { 3 public class RequestMsg 4 { 5 public string Name { get; set; } 6 7 public string Code { get; set; } 8 9 public override string ToString() 10 { 11 return $"Name: {Name}, Code: {Code}"; 12 } 13 } 14 }
Constants的具體配置可以參考這篇文章
http://www.rabbitmq.com/dotnet-api-guide.html
在C#中使用消息隊列RabbitMQ