1. 程式人生 > >SpringCloud Stream生產者配置RabbitMq的動態路由鍵

SpringCloud Stream生產者配置RabbitMq的動態路由鍵

在寫這個文章前不得不吐槽目前國內一些blog的文章,盡是些複製貼上的文章,提到點上但沒任何的深入和例子。.........

經過測試下來總結一下RabbitMQ的Exchange的特性:

1、direct

生產者可以指定路由鍵,消費者可以指定路由鍵,但不能講路由鍵設定為#(全部)。

2、topic

生產者可以指定路由鍵,消費者可以指定路由鍵,也可以不指定(或者是#)。

3、fanout

生產者和消費都忽略路由鍵。

 

在現實的場景裡,通常是生產者會生產多個路由鍵的消費,然後多個消費來消費指定路由鍵的訊息,但通常生產者的生產程式碼是同一份,如何在發訊息的時候動態指定當前訊息的路由鍵呢?

例子:門店平臺系統集中處理多個門店的資料,然後分別將不同門店的資料傳送到不同的門店(即:A門店只消費屬於A門店的訊息,B門店只消費屬於B的訊息)

看例子:

application.yml

 1 spring:    
 2     cloud:
 3         stream:
 4             # 設定預設的binder
 5             default-binder: pos
 6             binders:
 7                 scm:
 8                     type: rabbit
 9
environment: 10 spring: 11 rabbitmq: 12 # 連線到scm的host和exchange 13 virtual-host: scm 14 pos: 15 type: rabbit 16 environment: 17 spring:
18 rabbitmq: 19 # 連線到pos的host和exchange 20 virtual-host: pos 21 22 shop: 23 type: rabbit 24 environment: 25 spring: 26 rabbitmq: 27 # 連線到shop的host和exchange 28 virtual-host: shop 29 30 bindings: 31 # ---------訊息消費------------ 32 33 # 集單開始生產消費 34 order_set_start_produce_input: 35 binder: pos 36 destination: POS_ORDER_SET_STRAT_PRODUCE 37 group: pos_group 38 39 # 門店ID為1的消費者 40 shop_consumer_input_1: 41 binder: shop 42 destination: POS_ORDER_SET_STRAT_PRODUCE 43 group: shop_1_group 44 45 46 #-----------訊息生產----------- 47 # 集單開始生產通知生產 48 order_set_start_produce_output: 49 binder: pos 50 destination: POS_ORDER_SET_STRAT_PRODUCE 51 52 rabbit: 53 bindings: 54 # 集單開始生產消費者 55 order_set_start_produce_input: 56 consumer: 57 exchangeType: topic 58 autoBindDlq: true 59 republishToDlq: true 60 deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_POS_DLX 61 #bindingRoutingKey: '#' 62 # 門店1的消費者 63 shop_consumer_input_1: 64 consumer: 65 exchangeType: topic 66 autoBindDlq: true 67 republishToDlq: true 68 deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_SHOP_1_DLX 69 bindingRoutingKey: 1 70 deadLetterRoutingKey: 1 71 72 # 生產者配置 73 order_set_start_produce_output: 74 producer: 75 exchangeType: topic 76 routingKeyExpression: headers.shopId 77 # routingKeyExpression: headers['shopId']

 

上面的配置檔案配置了一個動態的基於shopId做路由的生產者配置,一個消費全部路由鍵的消費者,如果要配置指定路由鍵的可以在配置檔案裡設定bindingRoutingKey屬性的值。

 

生產者java程式碼:

import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.longge.pos.production.mq.dto.OrderSetProductionMsg;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MqSendUtil {
    private static MessageChannel orderSetStartProduceChannel;

    public static void setSfOrderCreateChannel(MessageChannel channel) {
        sfOrderCreateProduceChannel = channel;
    }
    
    public static void sendOrderSetPrintMsg(OrderSetProductionMsg msg) {
        // add kv pair - routingkeyexpression (which matches 'type') will then evaluate
        // and add the value as routing key
        log.info("傳送開始生產的MQ:{}", JSONObject.toJSONString(msg));
        orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).setHeader("shopId", msg.getOrderSet().getShopId()).build());
        //orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).build());
    }
}

動態路由的核心在於上面那個紅色的字型的地方,這個是和配置檔案裡的  routingKeyExpression 的配置是匹配的。