RabbitMQ使用示例
阿新 • • 發佈:2019-01-02
首先搭建RabbitMQ開發環境,不知道流程的自行百度。
搭建完成後預設訪問地址為:http://localhost:15672 管理介面如下圖所示:
公共類:
public class RequestMsg { /// <summary> /// 名稱 /// </summary> public string Name { get; set; } /// <summary> /// 編碼 /// </summary> public string Code { get; set; } } public class Constants { /// <summary> /// Host地址 /// </summary> public const string MqHost = "127.0.0.1"; /// <summary> /// 埠號 /// </summary> public const int MqPort = 5672; /// <summary> /// 使用者名稱 /// </summary> public const string UserName = "guest"; /// <summary> /// 密碼 /// </summary> public const string Password = "guest"; }
訊息生產者控制檯執行程式碼:
//建立RabbitMQ連線和通道 ConnectionFactory connectionFactory = new ConnectionFactory { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true, //自動重連 RequestedFrameMax = UInt32.MaxValue, RequestedHeartbeat = UInt16.MaxValue //心跳超時時間 }; try { using (var connection = connectionFactory.CreateConnection()) { using (var channel = connection.CreateModel()) { //建立一個新的Direct模式的持久交換區 exchange持久化 SISOExchange表示Exchange名稱 durable: true, autoDelete: false, arguments: null channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null); //建立一個新的,持久的佇列, 沒有排他性,與不自動刪除 queue持久化 durable: true, exclusive: false, autoDelete: false, arguments: null channel.QueueDeclare("SISOqueue", true, false, false, null); // 繫結佇列到交換區 Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來. 如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的,如果exchange和queue兩者之間有一個持久化,一個非持久化,則不允許建立繫結. //注意:一旦建立了佇列和交換機,就不能修改其標誌了,例如,建立了一個non-durable的佇列,然後想把它改變成durable的,唯一的辦法就是刪除這個佇列然後重現建立。 //channel.queueBind("queue_name","exchange_name","route_key"); channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey"); // 設定訊息屬性 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //訊息是持久的,存在並不會受伺服器重啟影響 1是非持久化 //準備開始推送 //釋出的訊息可以是任何一個(可以被序列化的)位元組陣列,如序列化物件,一個實體的ID,或只是一個字串 var encoding = new UTF8Encoding(); for (var i = 0; i < 10; i++) { var msg = string.Format("這是訊息 #{0}--{1}?", i + 1,DateTime.Now.ToString()); var msgBytes = encoding.GetBytes(msg); //var msgBytes = Encoding.UTF8.GetBytes(msg); //RabbitMQ訊息模型的核心思想就是,生產者不把訊息直接傳送給佇列。實際上,生產者在很多情況下都不知道訊息是否會被髮送到一個佇列中。取而代之的是,生產者將訊息傳送到交換區。交換區是一個非常簡單的東西,它一端接受生產者的訊息,另一端將他們推送到佇列中。交換區必須要明確的指導如何處理它接受到的訊息。是放到一個佇列中,還是放到多個佇列中,亦或是被丟棄。這些規則可以通過交換區的型別來定義。 //可用的交換區型別有:direct,topic,headers,fanout。 //Exchange:用於接收訊息生產者傳送的訊息,有三種類型的exchange:direct, fanout,topic,不同型別實現了不同的路由演算法; //RoutingKey:是RabbitMQ實現路由分發到各個佇列的規則,並結合Binging提供於Exchange使用將訊息推送入佇列; //Queue:是訊息佇列,可以根據需要定義多個佇列,設定佇列的屬性,比如:訊息移除、訊息快取、回撥機制等設定,實現與Consumer通訊; channel.BasicPublish("SISOExchange", "optionalRoutingKey", properties, msgBytes); } channel.Close(); } } } catch (Exception ex) { Console.WriteLine(ex.Message); } Console.WriteLine("訊息釋出!"); Console.ReadKey(true);
消費者控制檯程式碼:
// 建立RabbitMQ連線和通道 由ConnectionFactory建立,每個connection只與一個物理的Server進行連線,此連線是基於Socket進行連線的 ConnectionFactory connectionFactory = new ConnectionFactory { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", Protocol = Protocols.AMQP_0_9_1, RequestedFrameMax = UInt32.MaxValue, RequestedHeartbeat = UInt16.MaxValue }; using (var connection = connectionFactory.CreateConnection()) //連線通道 一個Connection可以有多個Channel using (var channel = connection.CreateModel()) { // 這指示通道不預取超過1個訊息 channel.BasicQos(0, 1, false); //建立一個新的,持久的交換區 channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null); //建立一個新的,持久的佇列 sample-queue為佇列名稱 channel.QueueDeclare("sample-queue", true, false, false, null); //繫結佇列到交換區 channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey"); using (var subscription = new Subscription(channel, "SISOqueue", false)) { Console.WriteLine("等待訊息..."); var encoding = new UTF8Encoding(); while (channel.IsOpen) { BasicDeliverEventArgs eventArgs; var success = subscription.Next(2000, out eventArgs); if (success == false) continue; var msgBytes = eventArgs.Body; var message = encoding.GetString(msgBytes); Console.WriteLine(message); channel.BasicAck(eventArgs.DeliveryTag, false); } } }
執行結果:
示例二:
訊息生產者控制檯程式碼: /*
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.UserName;
factory.Password = Constants.Password;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare("myFirstQueue", true, false, false, null);
while (true)
{
string customStr = Console.ReadLine();
RequestMsg requestMsg = new RequestMsg();
requestMsg.Name = string.Format("Name_{0}", customStr);
requestMsg.Code = string.Format("Code_{0}", customStr);
string jsonStr = JsonConvert.SerializeObject(requestMsg);
byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
//設定訊息持久化
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("", "MyFirstQueue", properties, bytes);
//channel.BasicPublish("", "MyFirstQueue", null, bytes);
Console.WriteLine("訊息已傳送:" + requestMsg.ToString());
}
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
*/
訊息消費者控制檯程式碼: /*
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.UserName;
factory.Password = Constants.Password;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定義一個持久化佇列,如果名稱相同不會重複建立
channel.QueueDeclare("MyFirstQueue", true, false, false, null);
//輸入1,那如果接收一個訊息,但是沒有應答,則客戶端不會收到下一個訊息
channel.BasicQos(0, 1, false);
Console.WriteLine("Listening...");
//在佇列上定義一個消費者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//消費佇列,並設定應答模式為程式主動應答
channel.BasicConsume("MyFirstQueue", false, consumer);
while (true)
{
//阻塞函式,獲取佇列中的訊息
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
string str = Encoding.UTF8.GetString(bytes);
RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
Console.WriteLine("HandleMsg:" + msg.ToString());
//回覆確認
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
catch (Exception e1)
{
Console.WriteLine(e1.ToString());
}
Console.ReadLine();
*/