1. 程式人生 > 實用技巧 >RabbitMQ(dotnet基本使用)

RabbitMQ(dotnet基本使用)

前言

RabbitMQ環境環境搭建及基本配置,在此不討論。網上一大堆。

NET環境下,Rabbit庫可以在官網或NUGET上查詢得到。

生產者

static void Main(string[] args)
{
    var factory = new ConnectionFactory();//連線工廠
    factory.HostName = "127.0.0.1";//地址
    factory.UserName = "Test";//登入名
    factory.Password = "t123456!";//密碼
 
    using (var connection = factory.CreateConnection())//
建立連線 { using (var channel = connection.CreateModel())//建立通道 { //建立佇列,第二個引數bool:是否佇列持久化 channel.QueueDeclare( queue: "test", //訊息佇列名稱 durable: false,//訊息佇列是否持久化 exclusive: false,//訊息佇列是否被本次連線connection獨享。(本次連線 //connection建立的通道可以共用).排外的queue在當前連線被斷開的時候會 //自動消失(清除)無論是否設定了持久化. autoDelete:
false,//訊息佇列是否自動刪除。也就是說queue會清理自己,但 是是在最後一個connection斷開的時候。 arguments: null);//引數對 var properties = channel.CreateBasicProperties();//可以為null properties.DeliveryMode = 2;//多個消費工作佇列時,設定此屬性 properties.SetPersistent(true);//訊息持久化
string message = "Hello World"; var body = Encoding.UTF8.GetBytes(message); //釋出訊息 //第一個引數:交換器,空預設為direct //第二個引數:direct時,為佇列名 //第三個引數:通道屬性,可以是BasicProperties,也可以是屬性介面 //第四個引數:訊息正文 channel.BasicPublish("", "hello", properties, body); Console.WriteLine(" set {0}", message); } } }

消費者

static void Main(string[] args)
{
    var factory = new ConnectionFactory();//連線工廠
    factory.HostName = "127.0.0.1";//地址
    factory.UserName = "Test";//登入名
    factory.Password = "t123456!";//密碼
 
    using (var connection = factory.CreateConnection())//建立連線
    {
        using (var channel = connection.CreateModel())//建立通道
        {
            //建立佇列,第二個引數bool:是否佇列持久化
            channel.QueueDeclare(
               queue: "test", //訊息佇列名稱
               durable: false,//訊息佇列是否持久化
               exclusive: false,//訊息佇列是否被本次連線connection獨享。(本次連線 
                  //connection建立的通道可以共用).排外的queue在當前連線被斷開的時候會 
                  //自動消失(清除)無論是否設定了持久化.
               autoDelete: false,//訊息佇列是否自動刪除。也就是說queue會清理自己,但 
                  是是在最後一個connection斷開的時候。
               arguments: null);//引數對
            
             //第一種接收方式:迴圈接收方式
             var consumer = new QueueingBasicConsumer(channel);//消費者例項
             channel.BasicConsume(
                 queue: "Test", //佇列名稱
                 autoAck: false, //是否開啟收到訊息自動回覆
                 consumer: consumer//消費者
             );
             //多個消費者確寶公平,設定同一時間一個消費只能接收一個訊息;此方法慎用
             channel.BasicQos(0, 1, false); 
             while (true)
            {
                //佇列訊息物件
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                //設定睡眠時間,可模擬工作佇列多個訊息者模式,自行決定
                Thread.Sleep(5* 1000);
                //假如,前面未設定自動回覆,則可以手動;
                //響應給RabbitMQ服務:收到並處理了訊息。        
                channel.BasicAck(ea.DeliveryTag, false);
                //遇到無法處理的訊息,拒絕且此訊息是否放回佇列中,傳送給其他消費者
                channel.BasicReject(ea.DeliveryTag, false);
                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");
            }
            //第二種接收方式:事件方式
            //例項化一個事件型消費者
            var consumer = new EventingBasicConsumer(channel);
            //訂閱消費者接收訊息的事件
            consumer.Received += (model, ea) =>
           {
               //獲取並解析資料
              var body = ea.Body;
              var message = Encoding.UTF8.GetString(body);
              //響應給RabbitMQ服務:收到並處理了訊息。        
              channel.BasicAck(ea.DeliveryTag, false);
              //遇到無法處理的訊息,拒絕且此訊息是否放回佇列中,傳送給其他消費者
              channel.BasicReject(ea.DeliveryTag, false);
              Console.WriteLine($"收到: {message}");
           };
        }
    }
}