1. 程式人生 > >Springboot2整合RocketMQ4.x

Springboot2整合RocketMQ4.x

  Springboot2.x整合RocketMQ4.x實戰,加入相關依賴,開發生產者程式碼
    
    啟動nameser和broker

    微信支付架構圖:

 

 微信支付呼叫相應的API進行支付,支付完成後回撥,把相應的支付資訊封裝成物件發給生產者。生產者把訊息傳送給訊息佇列  broker,通過NameServer獲取相應的路由地址。訊息訂閱者從broker獲取相應的訊息,生成相應的支付憑證,增加積分等

   

 

1、加入相關依賴
        <dependency>  
            <groupId>org.apache.rocketmq</groupId>  
            <artifactId>rocketmq-client</artifactId>  
            <version>${rocketmq.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.rocketmq</groupId>  
            <artifactId>rocketmq-common</artifactId>  
            <version>${rocketmq.version}</version>  
        </dependency>  

 

2、application.properties加入配置檔案        
        # 消費者的組名
        apache.rocketmq.consumer.PushConsumer=orderConsumer
        # 生產者的組名
        apache.rocketmq.producer.producerGroup=Producer
        # NameServer地址
        apache.rocketmq.namesrvAddr=127.0.0.1:9876

 


    3、開發MsgProducer
         /**
         * 生產者的組名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;

        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;

        private  DefaultMQProducer producer ;

            
        public DefaultMQProducer getProducer(){
            return this.producer;
        }
        
        @PostConstruct
        public void defaultMQProducer() {
            //生產者的組名
            producer = new DefaultMQProducer(producerGroup);
            //指定NameServer地址,多個地址以 ; 隔開
            //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876"); 
            producer.setNamesrvAddr(namesrvAddr);
            producer.setVipChannelEnabled(false);
            
            try {
                /**
                 * Producer物件在使用之前必須要呼叫start初始化,只能初始化一次
                 */
                producer.start();

            } catch (Exception e) {
                e.printStackTrace();
            } 
            
            // producer.shutdown();  一般在應用上下文,關閉的時候進行關閉,用上下文監聽器

        }


模擬微型支付回撥的controller

@RestController
@RequestMapping("/api/v1")
public class OrderController {
    
    
    @Autowired
    private MsgProducer msgProducer;
    
    /**
     * 功能描述:微信支付回撥介面
     * @param msg 支付資訊
     * @param tag 訊息二級分類
     * @return
     */
    @GetMapping("order")
    public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
      
        /**
        * 建立一個訊息例項,包含 topic、tag 和 訊息體           
       */
       Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
       
       SendResult result = msgProducer.getProducer().send(message);
       
       System.out.println("傳送響應:MsgId:" + result.getMsgId() + ",傳送狀態:" + result.getSendStatus());
     
       return JsonData.buildSuccess();
    }
    

5.建立消費者

@Component
public class MsgConsumer {
    /**
     * 消費者的組名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    
    
    
    
    @PostConstruct
    public void defaultMQPushConsumer() {
        //消費者的組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        //指定NameServer地址,多個地址以 ; 隔開
        consumer.setNamesrvAddr(namesrvAddr);
        
        try {
            //設定consumer所訂閱的Topic和Tag,*代表全部的Tag
            consumer.subscribe("testTopic", "*");
          
            //CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,跳過歷史訊息
            //CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            
                       
            //MessageListenerOrderly 這個是有序的
            //MessageListenerConcurrently 這個是無序的,並行的方式處理,效率高很多
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        
                        System.out.println("messageExt: " + messageExt);//輸出訊息內容

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        
                        System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//輸出訊息內容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍後再試
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
            });
            
            
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}