RabbitMQ指南(四)交換機
4.1 直連型別
直連型別交換機與佇列通過路由鍵進行繫結,訊息傳送時,需指定交換機和路由鍵,交換機接收到訊息後,根據路由鍵將訊息轉發至相應佇列。
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 設定服務端的地址、埠、使用者名稱和密碼...
Connection connection = factory.newConnection();
Channel channel = connection.createChannel ();
// 宣告直連型別交換機
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("direct.queue", true, false, false, null);
// 將交換機和佇列通過路由鍵“key”繫結
channel.queueBind("direct.queue", "direct.exchange", "key");
// 傳送訊息,指定交換機和路由鍵
channel.basicPublish("direct.exchange", "key" , null, "Test".getBytes());
channel.close();
connection.close();
}
執行程式碼後,進入Web管理介面,可以看到,一個名為“direct.exchange”的直連型別交換機已被建立,點選交換機名進入交換機管理介面,Bindings下拉框可檢視當前交換機的繫結情況。佇列“direct.queue”通過路由鍵“key”與當前交換機繫結,並且從佇列“direct.queue”中能找到傳送的訊息。 佇列與直連交換機繫結,可以有更為複雜的情況: (1)一個佇列可通過不同路由鍵與直連交換機進行繫結,指定任意一個路由鍵傳送訊息,訊息都會被轉發到該佇列中;
4.2 廣播型別
廣播型別交換機與佇列繫結時無需指定路由鍵,交換機收到訊息後,會轉發給所有與其繫結的佇列中。
// 宣告廣播型別交換機
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT);
channel.queueDeclare("fanout.queue1", true, false, false, null);
channel.queueDeclare("fanout.queue2", true, false, false, null);
// 將交換機和佇列繫結
channel.queueBind("fanout.queue1", "fanout.exchange", "");
channel.queueBind("fanout.queue2", "fanout.exchange", "");
// 傳送訊息,指定廣播交換機
channel.basicPublish("fanout.exchange", "", null, "Test".getBytes());
執行程式後,進入Web管理介面,可以看到,一個名為“fanout.exchange”的廣播型別交換機已被建立,點選交換機名進入交換機管理介面,Bindings下拉框可檢視當前交換機的繫結情況。佇列“fanout.queue1”“fanout.queue2”與當前交換機繫結,並且從兩個佇列中都能找到傳送的訊息。 在廣播交換機的宣告和繫結中,應注意: (1)路由鍵不能為null,即使廣播交換機與佇列的繫結與路由鍵無關,也需填寫路由鍵入參,一般以空字串作為路由鍵入參,傳送訊息時亦以空字串作為路由鍵入參; (2)繫結時也可填寫路由鍵入參,在Web管理介面的交換機繫結列表中也會顯示繫結的路由鍵,但這個路由鍵是無效的,向交換機中傳送訊息時,即使指定路由鍵,所有與其繫結的佇列也會收到訊息。所以在RabbitMQ的使用中,應首先關注交換機的型別; (3)若先後以不同的路由鍵入參將佇列多次與廣播交換機繫結,繫結列表中也會顯示佇列與廣播交換機繫結使用的不同路由鍵,但向廣播交換機發送訊息後,佇列也只會收到一次訊息。
4.3 主題型別
主題型別交換機與佇列繫結時也需指定路由鍵。與直連型別交換機不同的是,直連交換機在訊息指定的路由鍵與佇列繫結使用的路由鍵完全相同時,才將訊息轉發到該佇列中;主題型別交換機可在路由鍵匹配過程中使用星號(*)、井號(#)進行模糊匹配,此時,路由鍵應為由英文句點(.)隔開的一個字串,“*”匹配一個單詞,“#”可匹配零至多個任意數量的單詞。 由句點隔開的各單詞通常表示訊息的各類屬性值,稱為訊息的主題,訊息的消費者往往僅關係某幾個維度的特徵,通過模糊匹配,訊息轉發到處理各類主題的佇列中,交由處理相應主題的消費者進行處理。主題型別交換機想必由此得名。 想象如下場景,一個證券行情釋出系統正在向客戶端釋出各類證券的實時行情,這些證券行情訊息可以由三個維度分類:(1)該證券所屬的市場,包括上海(sh)和深圳(sz);(2)該證券的類別,包括股票(stock)、債券(bond)、基金(fund);(3)該行情的型別,包括最新成交價(deal)和買賣兩方五檔價格(buy1 - buy5、sell1 - sell5)。 有3個使用者出於不同的需求,需要不同的行情資訊:甲需要上海市場所有證券的全部行情資訊,乙需要深圳市場所有證券的最新價,丙需要全市場股票的最新價。則3個使用者的訊息佇列與主題交換機繫結的路由鍵為—— 甲(佇列topic.queue.a):sh.# 乙(佇列topic.queue.b):sz.*.deal 丙(佇列topic.queue.c):*.stock.deal 當行情釋出系統釋出不同的行情訊息,3個使用者收到行情訊息也不同。舉例如下,×表示不收到該訊息,√表示會收到該訊息:
甲(sh.#) | 乙(sz.*.deal) | 丙(*.stock.deal) | |
---|---|---|---|
上海某債券買一價sh.bond.buy1 | √ | × | × |
上海某股票最新價sh.stock.deal | √ | × | √ |
深圳某基金最新價sz.fond.deal | × | √ | × |
深圳某股票最新價sz.stock.deal | × | √ | √ |
可通過如下程式進行驗證。
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 設定服務端的地址、埠、使用者名稱和密碼...
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告主題型別交換機
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("topic.queue.a", true, false, false, null);
channel.queueDeclare("topic.queue.b", true, false, false, null);
channel.queueDeclare("topic.queue.c", true, false, false, null);
// 將交換機和佇列繫結
channel.queueBind("topic.queue.a", "topic.exchange", "sh.#");
channel.queueBind("topic.queue.b", "topic.exchange", "sz.*.deal");
channel.queueBind("topic.queue.c", "topic.exchange", "*.stock.deal");
// 傳送訊息
channel.basicPublish("topic.exchange", "sh.bond.buy1", null, "sh.bond.buy1 message".getBytes());
channel.basicPublish("topic.exchange", "sh.stock.deal", null, "sh.stock.deal message".getBytes());
channel.basicPublish("topic.exchange", "sz.fond.deal", null, "sz.fond.deal message".getBytes());
channel.basicPublish("topic.exchange", "sz.stock.deal", null, "sz.stock.deal message".getBytes());
channel.close();
connection.close();
}
執行程式後,可以看到,交換機列表中出現了建立的主題型別交換機“topic.exchange”,其與佇列的繫結關係與設定相同。進入佇列,檢視佇列中的訊息,與表格所列是一致的。 如此,通過主題交換機,三個佇列都只收到自己需要的訊息。特殊地,路由鍵為“#”的佇列會收到所有訊息。
4.4 首部型別
首部型別交換機定義路由規則時,並非使用一個字串型的路由鍵,而是使用一組鍵值對作為訊息首部,首部中包含一個特殊的鍵“x-match”,該鍵的值有兩個選項“all”和“any”,由此定義首部型別交換機的兩種路由規則:(1)全匹配(all),訊息首部需包含佇列繫結定義的首部的所有鍵,並且對應的值都相同,才匹配;(2)任意一組匹配(any),訊息首部和佇列繫結定義的首部中,只要有一組鍵和值相同,就進行匹配。 仍以證券行情釋出系統為例,證券的行情資訊由兩個維度分類:(1)市場(market),包括上海(sh)和深圳(sz);(2)證券類別(type),股票(stock)、債券(bond)、基金(fund)。 甲只需要上海市場股票的行情資訊,而乙需要滬深兩市的股票以及上海其他證券的所有行情資訊,則兩個消費者佇列繫結時定義的首部如下—— 甲:{“x-match”:”all”, “market”:”sh”, “type”:”stock”} (即market=sh、type=stock的訊息才接收) 乙:{“x-match”:”any”, “market”:”sh”, “type”:”stock”} (即market=sh或type=stock的訊息全部接收) 當行情釋出系統釋出不同的行情訊息,2個使用者收到行情訊息也不同。舉例如下,×表示不收到該訊息,√表示會收到該訊息:
甲x-match:allmarket:shtype:stock | 乙x-match:anymarket:shtype:stock | |
---|---|---|
上海某股票的行情訊息market:shtype:stock | √ | √ |
上海某債券的行情訊息market:shtype:bond | × | √ |
深圳某股票的行情訊息market:sztype:stock | × | √ |
可通過如下程式進行驗證。
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 設定服務端的地址、埠、使用者名稱和密碼...
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告首部型別交換機
channel.exchangeDeclare("headers.exchange", BuiltinExchangeType.HEADERS);
channel.queueDeclare("headers.queue.a", true, false, false, null);
channel.queueDeclare("headers.queue.b", true, false, false, null);
// 將交換機和佇列繫結
Map<String, Object> heardersMapA = new HashMap<String, Object>();
heardersMapA.put("x-match", "all");
heardersMapA.put("market", "sh");
heardersMapA.put("type", "stock");
channel.queueBind("headers.queue.a", "headers.exchange", "", heardersMapA);
Map<String, Object> heardersMapB = new HashMap<String, Object>();
heardersMapB.put("x-match", "any");
heardersMapB.put("market", "sh");
heardersMapB.put("type", "stock");
channel.queueBind("headers.queue.b", "headers.exchange", "", heardersMapB);
// 傳送訊息
// 訊息1
Map<String, Object> heardersMessage1 = new HashMap<String, Object>();
heardersMessage1.put("market", "sh");
heardersMessage1.put("type", "stock");
AMQP.BasicProperties.Builder properties1 = new AMQP.BasicProperties().builder().headers(heardersMessage1);
channel.basicPublish("headers.exchange", "", properties1.build(), "sh stock".getBytes());
// 訊息2
Map<String, Object> heardersMessage2 = new HashMap<String, Object>();
heardersMessage2.put("market", "sh");
heardersMessage2.put("type", "bond");
AMQP.BasicProperties.Builder properties2 = new AMQP.BasicProperties().builder().headers(heardersMessage2);
channel.basicPublish("headers.exchange", "", properties2.build(), "sh bond".getBytes());
// 訊息3
Map<String, Object> heardersMessage3 = new HashMap<String, Object>();
heardersMessage3.put("market", "sz");
heardersMessage3.put("type", "stock");
AMQP.BasicProperties.Builder properties3 = new AMQP.BasicProperties().builder().headers(heardersMessage3);
channel.basicPublish("headers.exchange", "", properties3.build(), "sz stock".getBytes());
channel.close();
connection.close();
}
執行程式後,可以看到,交換機列表中出現了建立的首部型別交換機“headers.exchange”,其與佇列的繫結關係與設定相同。進入佇列,檢視佇列中的訊息,與表格所列是一致的。 宣告與繫結首部交換機時需注意: (1)佇列繫結首部交換機時無需路由鍵,但路由鍵入參不能為null,一般填空字串; (2)佇列繫結首部交換機時,若首部不包含“x-match”鍵,則其預設為“all”型別的繫結。
4.5 交換機引數
宣告交換機的方法exchangeDeclare()含有一系列過載方法,可通過這些方法設定交換機的各種屬性,包括—— durable:持久化,交換機以及其與佇列的繫結關係預設儲存在記憶體中,RabbitMQ服務端重啟後將丟失,若需將其儲存在磁碟上,需開啟持久化(一般實際使用中都是開啟的);該持久化是針對交換機和佇列繫結關係的,若佇列持久化而交換機未持久化,RabbitMQ重啟後只會丟失交換機而不會丟失佇列; autoDelete:自動刪除,當與該交換機繫結的佇列全部解綁或刪除後,該交換機自動刪除; internal:內部,啟用該引數後,RabbitMQ客戶端無法直接向該交換機發布訊息,只能繫結到另外的交換機使用。
4.6 交換機之間的繫結
不僅只有佇列可以與交換機繫結,交換機之間也可以進行繫結,繫結的交換機之間型別也可不同。利用交換機之間的繫結,可以實現較複雜的轉發規則,形成一個網狀的路由結構。一般只有一個對外開放的交換機,訊息只能發往這個入口交換機,其餘交換機則宣告為內部(internal)的。 利用之前證券行情釋出的例子,定義如下轉發結構。證券所屬市場放在訊息的首部,而證券的型別以及行情的型別作為訊息的主題。傳送如圖所示的訊息時,根據路由規則,訊息將由交換機“headers.exchange”轉發至“topic.exchange.sh”,在根據主題轉發至佇列“queue.sh.b”。 可由以下程式驗證。
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 設定服務端的地址、埠、使用者名稱和密碼...
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告交換機和佇列
channel.exchangeDeclare("headers.exchange", BuiltinExchangeType.HEADERS, true, false, false, null);
channel.exchangeDeclare("topic.exchange.sh", BuiltinExchangeType.TOPIC, true, false, true, null);
channel.exchangeDeclare("topic.exchange.sz", BuiltinExchangeType.TOPIC, true, false, true, null);
channel.queueDeclare("queue.sh.a", true, false, false, null);
channel.queueDeclare("queue.sh.b", true, false, false, null);
channel.queueDeclare("queue.sz.a", true, false, false, null);
// 繫結
Map<String, Object> headersMapSH = new HashMap<String, Object>();
headersMapSH.put("market", "sh");
channel.exchangeBind("topic.exchange.sh", "headers.exchange", "", headersMapSH);
Map<String, Object> headersMapSZ = new HashMap<String, Object>();
headersMapSZ.put("market", "sz");
channel.exchangeBind("topic.exchange.sz", "headers.exchange", "", headersMapSZ);
channel.queueBind("queue.sh.a", "topic.exchange.sh", "stock.*");
channel.queueBind("queue.sh.b", "topic.exchange.sh", "bond.*");
channel.queueBind("queue.sz.a", "topic.exchange.sz", "#");
// 傳送訊息
Map<String, Object> headersMessage = new HashMap<String, Object>();
headersMessage.put("market", "sh");
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(headersMessage);
channel.basicPublish("headers.exchange", "bond.deal", properties.build(), "sh.bond.deal message".getBytes());
channel.close();
connection.close();
}
執行程式後,檢視Web管理介面,與設想的一致,訊息出現在佇列“queue.sh.b”中。
4.7 AE交換機
一般情況下若訊息的路由鍵與所有繫結佇列都不匹配時,該訊息將丟棄。RabbitMQ為交換機提供“AE交換機(Alternate Exchange)”屬性,當傳送到此交換機的訊息沒有轉發到佇列(即沒有任意一個繫結佇列的路由規則與訊息匹配)時,該訊息就被轉發到“AE交換機”屬性指定的交換機裡。 宣告AE交換機,可使用rabbitmqctl工具或客戶端API,其中官方網站推薦使用rabbitmqctl進行宣告。 API呼叫示例——
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 設定服務端的地址、埠、使用者名稱和密碼...
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 建立廣播交換機ae.exchange和佇列ae.queue並繫結
channel.exchangeDeclare("ae.exchange", BuiltinExchangeType.FANOUT, true, false, null);
channel.queueDeclare("ae.queue", true, false, false, null);
channel.queueBind("ae.queue", "ae.exchange", "");
// 建立直連交換機direct.exchange,其AE交換機為ae.exchange
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("alternate-exchange", "ae.exchange");
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true, false, arguments);
// 傳送訊息到direct.exchange,路由鍵為不存在的佇列名
channel.basicPublish("direct.exchange", "no_queue", null, "test".getBytes());
channel.close();
connection.close();
}
執行程式後可以看到,交換機“direct.exchange”的特性一列中出現了“AE”,即表示該交換機擁有AE交換機。訊息出現在了佇列“ae.queue”中。這是由於沒有任何佇列與交換機“direct.exchange”繫結,訊息轉發到了它的AE交易機“ae.exchange”中,“ae.exchange”將其廣播到了“ae.queue”。
![AE交換機](https://img-blog.csdnimg.cn/20181102191653327.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzUzMzM1OA==,size_16,color_FFFFFF,t_70) 官方網站更推薦使用rabbitmqctl進行AE交換機的設定(可能由於這樣更容易通過指令碼實施),上文的AE交換機的宣告可用以下命令代替:rabbitmqctl set_policy AE "^direct.exchange$" '{"alternate-exchange":"ae.exchange"}'