RabbitMQ Exchange型別詳解
在上一篇文章中,我們知道了RabbitMQ的訊息流程如下:
但在具體的使用中,我們還需知道exchange的型別,因為不同的型別對應不同的佇列和路由規則。
在rabbitmq中,exchange有4個型別:direct,topic,fanout,header。
direct exchange
此型別的exchange路由規則很簡單:
exchange在和queue進行binding時會設定routingkey
channel.QueueBind(queue: "create_pdf_queue", exchange: "pdf_events", routingKey: "pdf_create", arguments: null);
然後我們在將訊息傳送到exchange時會設定對應的routingkey:
channel.BasicPublish(exchange: "pdf_events", routingKey: "pdf_create", basicProperties: properties, body: body);
在direct型別的exchange中,只有這兩個routingkey完全相同,exchange才會選擇對應的binging進行訊息路由。
具體的流程如下:
通過程式碼可以會理解好一點:
var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // Direct型別的exchange, 名稱 pdf_events channel.ExchangeDeclare(exchange: "pdf_events", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); // 建立create_pdf_queue佇列 channel.QueueDeclare(queue: "create_pdf_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立 pdf_log_queue佇列 channel.QueueDeclare(queue: "pdf_log_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); //繫結 pdf_events --> create_pdf_queue 使用routingkey:pdf_create channel.QueueBind(queue: "create_pdf_queue", exchange: "pdf_events", routingKey: "pdf_create", arguments: null); //繫結 pdf_events --> pdf_log_queue 使用routingkey:pdf_log channel.QueueBind(queue: "pdf_log_queue", exchange: "pdf_events", routingKey: "pdf_log", arguments: null); var message = "Demo some pdf creating..."; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //傳送訊息到exchange :pdf_events ,使用routingkey: pdf_create //通過binding routinekey的比較,次訊息會路由到佇列 create_pdf_queue channel.BasicPublish(exchange: "pdf_events", routingKey: "pdf_create", basicProperties: properties, body: body); message = "pdf loging ..."; body = Encoding.UTF8.GetBytes(message); properties = channel.CreateBasicProperties(); properties.Persistent = true; //傳送訊息到exchange :pdf_events ,使用routingkey: pdf_log //通過binding routinekey的比較,次訊息會路由到佇列 pdf_log_queue channel.BasicPublish(exchange: "pdf_events", routingKey: "pdf_log", basicProperties: properties, body: body); }
topic exchange
此型別exchange和上面的direct型別差不多,但direct型別要求routingkey完全相等,這裡的routingkey可以有萬用字元:'*','#'.
其中'*'表示匹配一個單詞, '#'則表示匹配沒有或者多個單詞
如上圖第一個binding:
- exchange: agreements
- queue A: berlin_agreements
- binding routingkey: agreements.eu.berlin.#
第二個binding:
- exchange: agreements
- queue B: all_agreements
- binding routingkey: agreements.#
第三個binding:
- exchange: agreements
- queue c: headstore_agreements
- binding routingkey: agreements.eu.*.headstore
所以如果我們訊息的routingkey為agreements.eu.berlin那麼符合第一和第二個binding,但最後一個不符合,具體的程式碼如下:
var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // Topic型別的exchange, 名稱 agreements channel.ExchangeDeclare(exchange: "agreements", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); // 建立berlin_agreements佇列 channel.QueueDeclare(queue: "berlin_agreements", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立 all_agreements 佇列 channel.QueueDeclare(queue: "all_agreements", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立 headstore_agreements 佇列 channel.QueueDeclare(queue: "headstore_agreements", durable: true, exclusive: false, autoDelete: false, arguments: null); //繫結 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.# channel.QueueBind(queue: "berlin_agreements", exchange: "agreements", routingKey: "agreements.eu.berlin.#", arguments: null); //繫結 agreements --> all_agreements 使用routingkey:agreements.# channel.QueueBind(queue: "all_agreements", exchange: "agreements", routingKey: "agreements.#", arguments: null); //繫結 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore channel.QueueBind(queue: "headstore_agreements", exchange: "agreements", routingKey: "agreements.eu.*.headstore", arguments: null); var message = "hello world"; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //傳送訊息到exchange :agreements ,使用routingkey: agreements.eu.berlin //agreements.eu.berlin 匹配 agreements.eu.berlin.# 和agreements.# //agreements.eu.berlin 不匹配 agreements.eu.*.headstore //最終次訊息會路由到隊裡:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#) channel.BasicPublish(exchange: "agreements", routingKey: "agreements.eu.berlin", basicProperties: properties, body: body); }
fanout exchange
此exchange的路由規則很簡單直接將訊息路由到所有繫結的佇列中,無須對訊息的routingkey進行匹配操作。
header exchange
此型別的exchange和以上三個都不一樣,其路由的規則是根據header來判斷,其中的header就是以下方法的arguments引數:
Dictionary<string, object> aHeader = new Dictionary<string, object>(); aHeader.Add("format", "pdf"); aHeader.Add("type", "report"); aHeader.Add("x-match", "all"); channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty, arguments: aHeader);
其中的x-match為特殊的header,可以為all則表示要匹配所有的header,如果為any則表示只要匹配其中的一個header即可。
在釋出訊息的時候就需要傳入header值:
var properties = channel.CreateBasicProperties(); properties.Persistent = true; Dictionary<string, object> mHeader1 = new Dictionary<string, object>(); mHeader1.Add("format", "pdf"); mHeader1.Add("type", "report"); properties.Headers = mHeader1;
具體的規則可以看以下程式碼:
var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // Headers型別的exchange, 名稱 agreements channel.ExchangeDeclare(exchange: "agreements", type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null); // 建立queue.A佇列 channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立 queue.B 佇列 channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立 queue.C 佇列 channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null); //繫結 agreements --> queue.A 使用arguments (format=pdf, type=report, x-match=all) Dictionary<string, object> aHeader = new Dictionary<string, object>(); aHeader.Add("format", "pdf"); aHeader.Add("type", "report"); aHeader.Add("x-match", "all"); channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty, arguments: aHeader); //繫結 agreements --> queue.B 使用arguments (format=pdf, type=log, x-match=any) Dictionary<string, object> bHeader = new Dictionary<string, object>(); bHeader.Add("format", "pdf"); bHeader.Add("type", "log"); bHeader.Add("x-match", "any"); channel.QueueBind(queue: "queue.B", exchange: "agreements", routingKey: string.Empty, arguments: bHeader); //繫結 agreements --> queue.C 使用arguments (format=zip, type=report, x-match=all) Dictionary<string, object> cHeader = new Dictionary<string, object>(); cHeader.Add("format", "zip"); cHeader.Add("type", "report"); cHeader.Add("x-match", "all"); channel.QueueBind(queue: "queue.C", exchange: "agreements", routingKey: string.Empty, arguments: cHeader); string message1 = "hello world"; var body = Encoding.UTF8.GetBytes(message1); var properties = channel.CreateBasicProperties(); properties.Persistent = true; Dictionary<string, object> mHeader1 = new Dictionary<string, object>(); mHeader1.Add("format", "pdf"); mHeader1.Add("type", "report"); properties.Headers = mHeader1; //此訊息路由到 queue.A 和 queue.B //queue.A 的binding (format=pdf, type=report, x-match=all) //queue.B 的binding (format = pdf, type = log, x - match = any) channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties, body: body); string message2 = "hello world"; body = Encoding.UTF8.GetBytes(message2); properties = channel.CreateBasicProperties(); properties.Persistent = true; Dictionary<string, object> mHeader2 = new Dictionary<string, object>(); mHeader2.Add("type", "log"); properties.Headers = mHeader2; //x-match 配置queue.B //queue.B 的binding (format = pdf, type = log, x-match = any) channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties, body: body); string message3= "hello world"; body = Encoding.UTF8.GetBytes(message3); properties = channel.CreateBasicProperties(); properties.Persistent = true; Dictionary<string, object> mHeader3 = new Dictionary<string, object>(); mHeader3.Add("format", "zip"); properties.Headers = mHeader3; //配置失敗,不會被路由 channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties, body: body); }
總計
以上就是exchange 型別的總結,一般來說direct和topic用來具體的路由訊息,如果要用廣播的訊息一般用fanout的exchange。
header型別用的比較少,但還是知道一點好。