RabbitMQ(二):理解消息通信RabbitMQ
一、消費者、生產者和信道
生產者(producer):生產者創建消息,然後發布(發送)到代理服務器(RabbitMQ),可以說發送消息的程序就是生產者。什麽是消息?消息包含兩部分:有效載荷和標簽。有效載荷就是傳輸的數據,可以是任何內容,包括json數據和圖片等等。而標簽(一個叫交換器名稱和可選的主題標記)描述了有效載荷,RabbitMQ用它來決定誰將獲得這個消息。
消費者(consumer):消費者就是接收消息並處理消息的程序,他們連接到代理服務器上,並訂閱到隊列上。當消費者接收消息時,它只是得到消息的有效載荷。整個過程很簡單:生產者創建消息,消費者接收消息。你的應用程序可以作為生產者也可以作為消費者,在兩者之間切換。但是消息的傳輸必定會通過某一介質傳遞,此處的消息就通過信道傳遞。
信道(channel):不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。應用程序和Rabbit代理服務器之間會創建一條TCP連接,TCP連接就像電纜,信道相當於電纜中的光束。在一條TCP連接上創建多少條信道是沒有限制的,所以不會對操作系統的TCP棧造成額外的負擔。
二、隊列、交換器和綁定
Rabbit的消息路由分為三部分:交換器、隊列和綁定。生產者把消息發布到交換器上;消息最終到達隊列,並被消費者接收;綁定決定了消息如何從路由器路由到特定的隊列。
隊列(queue):隊列是一個棧先進先出,為生產者發布的消息提供了保存的處所,消息在此等待消費。本質上隊列可以存儲無限的消息,但是需要視系統內存而定。
綁定(binding):隊列通過路由鍵綁定到交換器,路由鍵就是消息通過交換器投遞到那個隊列的規則。
交換器(exchange):交換器有四種類型:direct、fanout、topic和headers。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個路由鍵,Exchange會根據這個路由鍵按照特定的路由算法,將消息路由給指定的queue。
(1)direct:如果路由鍵匹配的話,消息就被投遞到對應的隊列。類似於單播。
(2)fanout:將消息廣播到綁定的隊列上,不管路由鍵是什麽,綁定的隊列都會收到消息。
(3)topic:類似與組播,和正則表達式類似,發送給路由鍵符合一定規則的隊列。如:路由鍵為user.stock的消息會轉發給綁定匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的隊列(* 表是匹配一個任意詞組,#表示匹配0個或多個詞組)。
(4)headers:不通過路由鍵匹配而是通過消息的header匹配,其他與direct交換器一致,但是性能上會差很多。
三、vhost
每一個RabbitMQ服務器都能創建虛擬消息服務器,我們稱之為虛擬主機(vhost)。每一個vhost本質上是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器和綁定,最重要的是擁有自己的權限機制。邏輯上vhost之間是相互獨立的分離的,保證了安全性和可移植性。RabbitMQ包含了開箱即用的默認的vhost:"/",因此使用起來非常簡單。當在RabbitMQ集群中創建vhost時,整個集群上都會創建該vhost,避免了vhost的重復創建。
四、第一次嘗試消息通信
交換器或隊列的時候,如果交換器或隊列已經存在,則直接返回結束,不會重復創建。無法承擔消息丟失,則生產者和消費者中都需要嘗試去創建交換器和隊列,如果可以承擔則可以由消費者來聲明隊列。我們創建生產者和消費者兩個控制臺程序分別運行以下代碼。
生產者:
static void Main(string[] args) { FirstProducer(); } /// <summary> /// 第一個生產者 /// </summary> private static void FirstProducer() { //1.連接到服務器 var conn_factory = new ConnectionFactory() { HostName = "localhost",UserName="guest",Password="guest",Port=5672//默認端口5672 }; using (IConnection conn = conn_factory.CreateConnection()) { //2.創建信道 using (IModel channel = conn.CreateModel()) { //3.聲明交換器 channel.ExchangeDeclare( "HelloExchange", //交換器名稱 ExchangeType.Direct,//交換器類型 true, //是否持久話 false, //是否自動刪除 null //關於交換器的詳細設置,鍵值對形式 ); //4.聲明隊列 channel.QueueDeclare( "HelloQueue",//隊列名稱 false, //是否持久化 false, //是否只對首次聲明的隊列可見 false, //是否自動刪除 null ////關於隊列和隊列內消息的詳細設置,鍵值對形式 ); //5.綁定交換器和隊列 channel.QueueBind( "HelloQueue", //隊列名 "HelloExchange", //交換器名 "hola" //路由鍵 ); //6.發布消息 string msg_str = "這是生產者第一次發布的消息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//發布的數據類型 for(int i = 0; i < 5; i++) { channel.BasicPublish( "HelloExchange", //消息發送目標交換器名稱 "hola", //路由鍵 msg_pro, //消息的發布屬性 Encoding.UTF8.GetBytes(msg_str) //消息 ); } } } }
消費者:
static void Main(string[] args) { FirstCousmer(); } /// <summary> /// 第一個消費者 /// </summary> private static void FirstCousmer() { //1.鏈接到服務器 var conn_factory = new ConnectionFactory() { HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672 }; using (var conn = conn_factory.CreateConnection()) { //2.創建信道 using(IModel channel = conn.CreateModel()) { //3.聲明交換器 channel.ExchangeDeclare( "HelloExchange", //交換器名稱 ExchangeType.Direct,//交換器類型 true, //是否持久話 false, //是否自動刪除 null //關於交換器的詳細設置,鍵值對形式 ); //4.聲明隊列 channel.QueueDeclare( "HelloQueue",//隊列名稱 false, //是否持久化 false, //是否只對首次聲明的隊列可見 false, //是否自動刪除 null ////關於隊列和隊列內消息的詳細設置,鍵值對形式 ); //5.綁定交換器和隊列 channel.QueueBind( "HelloQueue", //隊列名 "HelloExchange", //交換器名 "hola" //路由鍵 ); //6.獲取消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => //消費者消息接收處理事件 { var body = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(body); channel.BasicAck(ea.DeliveryTag, false); //確認接收消息,從隊列中刪除 }; //7.啟動消費者 string consumer_tag = channel.BasicConsume( "HelloQueue", //獲取的隊列名稱 false, //是否自動確認接收消息,從隊列中刪除 consumer //消費者對象 ); channel.BasicCancel(consumer_tag);//調用消費 //var consumer = new QueueingBasicConsumer(channel); //channel.BasicConsume("HelloQueue", false, consumer); //while (true) //{ // var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); // var body = ea.Body; // var message = Encoding.UTF8.GetString(body); // Console.WriteLine("Received {0}", message); // channel.BasicAck(ea.DeliveryTag, false); //} } } Console.ReadLine(); }
運行消費者後可以看到,以下結果:
RabbitMQ(二):理解消息通信RabbitMQ