1. 程式人生 > >RabbitMQ(二):理解消息通信RabbitMQ

RabbitMQ(二):理解消息通信RabbitMQ

代理服 style 操作 https ring 集群 odi 列名 都是

原文:RabbitMQ(二):理解消息通信RabbitMQ

一、消費者、生產者和信道

  生產者(producer):生產者創建消息,然後發布(發送)到代理服務器(RabbitMQ),可以說發送消息的程序就是生產者。什麽是消息?消息包含兩部分:有效載荷和標簽。有效載荷就是傳輸的數據,可以是任何內容,包括json數據和圖片等等。而標簽(一個叫交換器名稱和可選的主題標記)描述了有效載荷,RabbitMQ用它來決定誰將獲得這個消息。

  消費者(consumer):消費者就是接收消息並處理消息的程序,他們連接到代理服務器上,並訂閱到隊列上。當消費者接收消息時,它只是得到消息的有效載荷。整個過程很簡單:生產者創建消息,消費者接收消息。你的應用程序可以作為生產者也可以作為消費者,在兩者之間切換。但是消息的傳輸必定會通過某一介質傳遞,此處的消息就通過信道傳遞。

  信道(channel):不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。應用程序和Rabbit代理服務器之間會創建一條TCP連接,TCP連接就像電纜,信道相當於電纜中的光束。在一條TCP連接上創建多少條信道是沒有限制的,所以不會對操作系統的TCP棧造成額外的負擔。

二、隊列、交換器和綁定

  Rabbit的消息路由分為三部分:交換器、隊列和綁定。生產者把消息發布到交換器上;消息最終到達隊列,並被消費者接收;綁定決定了消息如何從路由器路由到特定的隊列。

  隊列(queue):隊列是一個棧先進先出,為生產者發布的消息提供了保存的處所,消息在此等待消費。本質上隊列可以存儲無限的消息,但是需要視系統內存而定。

  綁定(binding):隊列通過路由鍵綁定到交換器,路由鍵就是消息通過交換器投遞到那個隊列的規則。

  交換器(exchange):交換器有四種類型:direct、fanout、topic和headers。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個路由鍵,Exchange會根據這個路由鍵按照特定的路由算法,將消息路由給指定的queue。

  (1)direct:如果路由鍵匹配的話,消息就被投遞到對應的隊列。類似於單播。

  (2)fanout:將消息廣播到綁定的隊列上,不管路由鍵是什麽,綁定的隊列都會收到消息。

  (3)topic:類似與組播,和正則表達式類似,發送給路由鍵符合一定規則的隊列。如:路由鍵為user.stock的消息會轉發給綁定匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的隊列(* 表是匹配一個任意詞組,#表示匹配0個或多個詞組)。

  (4)headers:不通過路由鍵匹配而是通過消息的header匹配,其他與direct交換器一致,但是性能上會差很多。

三、vhost

  每一個RabbitMQ服務器都能創建虛擬消息服務器,我們稱之為虛擬主機(vhost)。每一個vhost本質上是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器和綁定,最重要的是擁有自己的權限機制。邏輯上vhost之間是相互獨立的分離的,保證了安全性和可移植性。RabbitMQ包含了開箱即用的默認的vhost:"/",因此使用起來非常簡單。當在RabbitMQ集群中創建vhost時,整個集群上都會創建該vhost,避免了vhost的重復創建。

四、第一次嘗試消息通信

  交換器或隊列的時候,如果交換器或隊列已經存在,則直接返回結束,不會重復創建。無法承擔消息丟失,則生產者和消費者中都需要嘗試去創建交換器和隊列,如果可以承擔則可以由消費者來聲明隊列。我們創建生產者和消費者兩個控制臺程序分別運行以下代碼。

  生產者:

        static void Main(string[] args)
        {
            FirstProducer();
        }

        /// <summary>
        /// 第一個生產者
        /// </summary>
        private static void FirstProducer() 
        {
            //1.連接到服務器
            var conn_factory = new ConnectionFactory() {
                HostName = "localhost",UserName="guest",Password="guest",Port=5672//默認端口5672
            };
            using (IConnection conn = conn_factory.CreateConnection()) 
            {
                //2.創建信道
                using (IModel channel = conn.CreateModel())
                {
                    //3.聲明交換器
                    channel.ExchangeDeclare(
                        "HelloExchange",    //交換器名稱
                        ExchangeType.Direct,//交換器類型
                        true,              //是否持久話
                        false,              //是否自動刪除
                        null                //關於交換器的詳細設置,鍵值對形式
                        );
                    //4.聲明隊列
                    channel.QueueDeclare(
                        "HelloQueue",//隊列名稱
                        false,       //是否持久化
                        false,       //是否只對首次聲明的隊列可見
                        false,       //是否自動刪除
                        null         ////關於隊列和隊列內消息的詳細設置,鍵值對形式
                        );
                    //5.綁定交換器和隊列
                    channel.QueueBind(
                        "HelloQueue",    //隊列名
                        "HelloExchange", //交換器名
                        "hola"           //路由鍵
                        );
                    //6.發布消息
                    string msg_str = "這是生產者第一次發布的消息";
                    IBasicProperties msg_pro = channel.CreateBasicProperties();
                    msg_pro.ContentType = "text/plain";//發布的數據類型
                    for(int i = 0; i < 5; i++)
                    {
                        channel.BasicPublish(
                            "HelloExchange",                    //消息發送目標交換器名稱
                            "hola",                             //路由鍵
                            msg_pro,                            //消息的發布屬性
                            Encoding.UTF8.GetBytes(msg_str)    //消息
                            );
                    }
                }
            }
        }

  消費者:

        static void Main(string[] args)
        {
            FirstCousmer();
        }
        /// <summary>
        /// 第一個消費者
        /// </summary>
        private static void FirstCousmer()
        {
            //1.鏈接到服務器
            var conn_factory = new ConnectionFactory()
            {
                HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672
            };
            using (var conn = conn_factory.CreateConnection()) 
            {
                //2.創建信道
                using(IModel channel = conn.CreateModel())
                {
                    //3.聲明交換器
                    channel.ExchangeDeclare(
                        "HelloExchange",    //交換器名稱
                        ExchangeType.Direct,//交換器類型
                        true,              //是否持久話
                        false,              //是否自動刪除
                        null                //關於交換器的詳細設置,鍵值對形式
                        );
                    //4.聲明隊列
                    channel.QueueDeclare(
                        "HelloQueue",//隊列名稱
                        false,       //是否持久化
                        false,       //是否只對首次聲明的隊列可見
                        false,       //是否自動刪除
                        null         ////關於隊列和隊列內消息的詳細設置,鍵值對形式
                        );
                    //5.綁定交換器和隊列
                    channel.QueueBind(
                        "HelloQueue",    //隊列名
                        "HelloExchange", //交換器名
                        "hola"           //路由鍵
                        );
                    //6.獲取消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (ch, ea) =>        //消費者消息接收處理事件
                    {
                        var body = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(body);
                        channel.BasicAck(ea.DeliveryTag, false); //確認接收消息,從隊列中刪除
                    };
                    //7.啟動消費者
                    string consumer_tag = channel.BasicConsume(
                        "HelloQueue", //獲取的隊列名稱
                        false,       //是否自動確認接收消息,從隊列中刪除
                        consumer     //消費者對象
                        );
                    channel.BasicCancel(consumer_tag);//調用消費
                    //var consumer = new QueueingBasicConsumer(channel);
                    //channel.BasicConsume("HelloQueue", false, consumer);
                    //while (true)
                    //{
                    //    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    //    var body = ea.Body;
                    //    var message = Encoding.UTF8.GetString(body);
                    //    Console.WriteLine("Received {0}", message);
                    //    channel.BasicAck(ea.DeliveryTag, false);
                    //}
                }
            }
            Console.ReadLine();
        }

  運行消費者後可以看到,以下結果:

技術分享圖片

RabbitMQ(二):理解消息通信RabbitMQ