1. 程式人生 > >RabbitMQ使用示例

RabbitMQ使用示例

首先搭建RabbitMQ開發環境,不知道流程的自行百度。

搭建完成後預設訪問地址為:http://localhost:15672    管理介面如下圖所示:


公共類:

  public class RequestMsg
    {
        /// <summary>
        /// 名稱
        /// </summary>
        public string Name { get; set; }

        /// <summary>
        /// 編碼
        /// </summary>
        public string Code { get; set; }
    }



    public class Constants
    {
        /// <summary>
        /// Host地址
        /// </summary>
        public const string MqHost = "127.0.0.1";

        /// <summary>
        /// 埠號
        /// </summary>
        public const int MqPort = 5672;

        /// <summary>
        /// 使用者名稱
        /// </summary>
        public const string UserName = "guest";

        /// <summary>
        /// 密碼
        /// </summary>
        public const string Password = "guest";
    }

訊息生產者控制檯執行程式碼:

   //建立RabbitMQ連線和通道  
            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                HostName = "127.0.0.1",
                Port = 5672,
                UserName = "guest",
                Password = "guest",
                Protocol = Protocols.DefaultProtocol,
                AutomaticRecoveryEnabled = true, //自動重連  
                RequestedFrameMax = UInt32.MaxValue,
                RequestedHeartbeat = UInt16.MaxValue //心跳超時時間  
            };
            try
            {
                using (var connection = connectionFactory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //建立一個新的Direct模式的持久交換區  exchange持久化    SISOExchange表示Exchange名稱    durable: true, autoDelete: false, arguments: null
                        channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null);
                        //建立一個新的,持久的佇列, 沒有排他性,與不自動刪除   queue持久化   durable: true, exclusive: false, autoDelete: false, arguments: null
                        channel.QueueDeclare("SISOqueue", true, false, false, null);

                        // 繫結佇列到交換區  Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來.   如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的,如果exchange和queue兩者之間有一個持久化,一個非持久化,則不允許建立繫結. 
                        //注意:一旦建立了佇列和交換機,就不能修改其標誌了,例如,建立了一個non-durable的佇列,然後想把它改變成durable的,唯一的辦法就是刪除這個佇列然後重現建立。
                        //channel.queueBind("queue_name","exchange_name","route_key");
                        channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey");

                        // 設定訊息屬性  
                        var properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = 2; //訊息是持久的,存在並不會受伺服器重啟影響   1是非持久化 

                        //準備開始推送  
                        //釋出的訊息可以是任何一個(可以被序列化的)位元組陣列,如序列化物件,一個實體的ID,或只是一個字串  
                        var encoding = new UTF8Encoding();
                        for (var i = 0; i < 10; i++)
                        {
                            var msg = string.Format("這是訊息 #{0}--{1}?", i + 1,DateTime.Now.ToString());
                            var msgBytes = encoding.GetBytes(msg);
                            //var msgBytes = Encoding.UTF8.GetBytes(msg);
                            
                            //RabbitMQ訊息模型的核心思想就是,生產者不把訊息直接傳送給佇列。實際上,生產者在很多情況下都不知道訊息是否會被髮送到一個佇列中。取而代之的是,生產者將訊息傳送到交換區。交換區是一個非常簡單的東西,它一端接受生產者的訊息,另一端將他們推送到佇列中。交換區必須要明確的指導如何處理它接受到的訊息。是放到一個佇列中,還是放到多個佇列中,亦或是被丟棄。這些規則可以通過交換區的型別來定義。  
                            //可用的交換區型別有:direct,topic,headers,fanout。  
                            //Exchange:用於接收訊息生產者傳送的訊息,有三種類型的exchange:direct, fanout,topic,不同型別實現了不同的路由演算法;  
                            //RoutingKey:是RabbitMQ實現路由分發到各個佇列的規則,並結合Binging提供於Exchange使用將訊息推送入佇列;  
                            //Queue:是訊息佇列,可以根據需要定義多個佇列,設定佇列的屬性,比如:訊息移除、訊息快取、回撥機制等設定,實現與Consumer通訊;  
                            channel.BasicPublish("SISOExchange", "optionalRoutingKey", properties, msgBytes);
                        }
                        channel.Close();
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            Console.WriteLine("訊息釋出!");
            Console.ReadKey(true);

消費者控制檯程式碼:

 // 建立RabbitMQ連線和通道 由ConnectionFactory建立,每個connection只與一個物理的Server進行連線,此連線是基於Socket進行連線的 
            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                HostName = "127.0.0.1",
                Port = 5672,
                UserName = "guest",
                Password = "guest",
                Protocol = Protocols.AMQP_0_9_1,
                RequestedFrameMax = UInt32.MaxValue,
                RequestedHeartbeat = UInt16.MaxValue
            };

            using (var connection = connectionFactory.CreateConnection())
            //連線通道 一個Connection可以有多個Channel
            using (var channel = connection.CreateModel())
            {
                // 這指示通道不預取超過1個訊息  
                channel.BasicQos(0, 1, false);

                //建立一個新的,持久的交換區  
                channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null);
                //建立一個新的,持久的佇列    sample-queue為佇列名稱
                channel.QueueDeclare("sample-queue", true, false, false, null);
                //繫結佇列到交換區  
                channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey");
                using (var subscription = new Subscription(channel, "SISOqueue", false))
                {
                    Console.WriteLine("等待訊息...");
                    var encoding = new UTF8Encoding();
                    while (channel.IsOpen)
                    {
                        BasicDeliverEventArgs eventArgs;
                        var success = subscription.Next(2000, out eventArgs);
                        if (success == false) continue;
                        var msgBytes = eventArgs.Body;
                        var message = encoding.GetString(msgBytes);
                        Console.WriteLine(message);
                        channel.BasicAck(eventArgs.DeliveryTag, false);
                    }
                }
            }

執行結果:


示例二:

訊息生產者控制檯程式碼:   /*
                try
                {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.HostName = Constants.MqHost;
                    factory.Port = Constants.MqPort;
                    factory.UserName = Constants.UserName;
                    factory.Password = Constants.Password;
                    using (IConnection conn = factory.CreateConnection())
                    {
                        using (IModel channel = conn.CreateModel())
                        {
                            channel.QueueDeclare("myFirstQueue", true, false, false, null);
                            while (true)
                            {
                                string customStr = Console.ReadLine();
                                RequestMsg requestMsg = new RequestMsg();
                                requestMsg.Name = string.Format("Name_{0}", customStr);
                                requestMsg.Code = string.Format("Code_{0}", customStr);
                                string jsonStr = JsonConvert.SerializeObject(requestMsg);
                                byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);

                                //設定訊息持久化
                                IBasicProperties properties = channel.CreateBasicProperties();
                                properties.DeliveryMode = 2;
                                channel.BasicPublish("", "MyFirstQueue", properties, bytes);

                                //channel.BasicPublish("", "MyFirstQueue", null, bytes);

                                Console.WriteLine("訊息已傳送:" + requestMsg.ToString());
                            }
                        }
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    throw;
                }
            */
 訊息消費者控制檯程式碼:  /*
            try
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = Constants.MqHost;
                factory.Port = Constants.MqPort;
                factory.UserName = Constants.UserName;
                factory.Password = Constants.Password;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //在MQ上定義一個持久化佇列,如果名稱相同不會重複建立
                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);

                        //輸入1,那如果接收一個訊息,但是沒有應答,則客戶端不會收到下一個訊息
                        channel.BasicQos(0, 1, false);

                        Console.WriteLine("Listening...");

                        //在佇列上定義一個消費者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //消費佇列,並設定應答模式為程式主動應答
                        channel.BasicConsume("MyFirstQueue", false, consumer);

                        while (true)
                        {
                            //阻塞函式,獲取佇列中的訊息
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            byte[] bytes = ea.Body;
                            string str = Encoding.UTF8.GetString(bytes);
                            RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
                            Console.WriteLine("HandleMsg:" + msg.ToString());
                            //回覆確認
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                    }
                }
            }
            catch (Exception e1)
            {
                Console.WriteLine(e1.ToString());
            }
            Console.ReadLine();
            */