1. 程式人生 > >SpringCloud Stream + Rabbit MQ

SpringCloud Stream + Rabbit MQ

介紹

通過stream可以讓程式跟具體佇列元件解耦,程式不用關心佇列元件的使用,只要建立好相應的通道,不論佇列元件怎麼更換,程式都無需關心。stream讓程式通過通道來進行訊息的生產和消費。

 

 

Stream中的input和output只是個別名,不論生產者的輸出通道名和消費者的輸入通道名叫什麼都無所謂,真正實現效果的是通道綁定了哪個交換機哪個佇列。

 

對於通道的命名,為了易於區分,可讓輸入和輸出採用同樣的通道名,表示生產者往這個通道傳送訊息,消費者消費訊息;或者採用channel_output和channel_input的寫法,用於區分輸入和輸出方。

 

在springboot2.x以上對應的springcloud stream版本中,在同個專案中將相同的通道名繫結到不同通道可能會報該通道名已存在的錯誤。在看過的教程裡,作者的版本是2.0.0.M3springcloudFinchley.M2可以將同樣的通道名繫結到不同的通道上(納悶)。

 

如下圖所示,input代表消費者,交換機和佇列是由消費者建立。該配置表示將交換機wdtest_下的average佇列繫結到通道input(命名隨意別重複即可)。在寫程式時只需監聽該通道即可接收到該佇列的訊息。

 

 

output代表生產者,該配置表示將交換機繫結到該通道下,程式只需往該通道傳送訊息即可。

 

搭建

springboot和springcloud的版本:

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.0.4.RELEASE</version>
	<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
	<spring-cloud.version>Finchley.SR1</spring-cloud.version>
</properties>

消費者配置

定義通道名:

package com.sosmmh.product.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

/**
 * @author Lixh
 * @Date: 2018/8/29 14:16
 * @Description:
 */
@Component
public interface CustomSink {

    String INPUT = "input3";

    String INPUT0 = "input0";

    String INPUT1 = "input1";

    String INPUT2 = "input2";

    @Input(INPUT)
    SubscribableChannel input();

    @Input(INPUT0)
    SubscribableChannel input0();

    @Input(INPUT1)
    SubscribableChannel input1();

    @Input(INPUT2)
    SubscribableChannel input2();
}

 

監聽通道:

package com.sosmmh.product.message;

import com.sosmmh.product.dto.CartDTO;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.support.GenericMessage;

/**
 * @author Lixh
 * @Date: 2018/8/29 14:16
 * @Description:
 */
@EnableBinding({ CustomSink.class })
public class Consumer {

    @StreamListener(CustomSink.INPUT)
    public synchronized void listen_average(CartDTO cartDTO) {
        System.out.println("Order Received For Average : " + cartDTO);
    }

    @StreamListener(CustomSink.INPUT0)
    public synchronized void listen_hdfsWrite(CartDTO cartDTO) {
        System.out.println("Order Received For hdfsWrite : " + cartDTO);
    }


    @StreamListener(CustomSink.INPUT1)
    public synchronized void receive(CartDTO cartDTO) {
        System.out.println("Item Received: " + cartDTO);
    }

    @StreamListener(CustomSink.INPUT2)
    public synchronized <T> void get(GenericMessage<T> msg) {
        System.out.println("Msg Received: " + msg.getPayload());
    }
}

 

配置檔案:這裡使用了通用配置的rabbitmq

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: wdtest_
          content-type: application/json
          group: average
        input3:
          destination: wdtest0
          content-type: application/json
          group: average
        input0:
          destination: wdtest0
          content-type: application/json
          group: hdfsWrite
        input1:
          destination: wdtest1
          content-type: application/json
          group: average
        input2:
          destination: wdtest2
          content-type: application/json
          group: average
        input11:
          destination: wdtest11
          content-type: application/json
          group: average
        input22:
          destination: wdtest22
          content-type: application/json
          group: average
  rabbitmq:
    host: 192.168.61.150
    port: 5672
    username: guest
    password: guest



 

生產者配置

定義通道名:

package com.sosmmh.order.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

/**
 * @author Lixh
 * @Date: 2018/8/29 14:12
 * @Description:
 */
@Component
public interface CustomSource {

    String OUTPUT = "output3";

    String OUTPUT1 = "output1";

    String OUTPUT2 = "output2";

    @Output(OUTPUT)
    MessageChannel output();

    @Output(OUTPUT1)
    MessageChannel output1();

    @Output(OUTPUT2)
    MessageChannel output2();
}

 

傳送訊息:

package com.sosmmh.order.message;

import com.sosmmh.order.dto.CartDTO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @author Lixh
 * @Date: 2018/8/29 14:13
 * @Description:
 */
@EnableBinding({ CustomSource.class })
@Component
public class Producer {

    @Autowired
    private CustomSource source;

    @Scheduled(fixedRate = 5000)
    public void produceHotDrinks() {
        source.output().send(
                MessageBuilder.withPayload(new CartDTO("produceHotDrinks", 1)).build());
    }

    @Scheduled(fixedRate = 3000)
    public void produceColdDrinks() {
        source.output().send(MessageBuilder
                .withPayload(new CartDTO("produceColdDrinks", 2)).build());

    }

    @Scheduled(fixedRate = 3000)
    public void produceItem() {
        source.output1()
                .send(MessageBuilder.withPayload(new CartDTO("produceItem", 3)).build());

    }

    @Scheduled(fixedRate = 3000)
    public void produceMsg() {
        source.output2().send(MessageBuilder.withPayload("produceMsg").build());
    }
}

 

配置檔案:

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: wdtest_
          content-type: application/json
        output3:
          destination: wdtest0
          content-type: application/json
        output1:
          destination: wdtest1
          content-type: application/json
        output2:
          destination: wdtest2
          content-type: application/json
        output11:
          destination: wdtest11
          content-type: application/json
        output22:
          destination: wdtest22
          content-type: application/json
  rabbitmq:
    host: 192.168.61.150
    port: 5672
    username: guest
    password: guest