1. 程式人生 > 實用技巧 >Spring Cloud Stream 訊息驅動

Spring Cloud Stream 訊息驅動

遮蔽底層訊息中介軟體的差異,降低切換成本 , 統一訊息的程式設計模型。

通過定義繫結器Binder 作為中介軟體。 實現應用程式與訊息中介軟體的細節之間的隔離。

訊息傳送端:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <
artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency
> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
主要是這個

訊息傳送實現類:

@EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel messageChannel; @Override public void send(String message) { Message<String> message1 = MessageBuilder.withPayload(message).build(); messageChannel.send(message1); } }
server:
  port: 8801


spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要繫結的rabbitmq的服務資訊
        defaultRabbit:
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
          binder: defaultRabbit # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    prefer-ip-address: true     # 訪問的路徑變為IP地址
public interface IMessageProvider {
    void send(String message);
}
@RestController
@Slf4j
public class SendMessageController {
    @Autowired
    private IMessageProvider iMessageProvider;


    @GetMapping("send")
    public String sendmessage(String message){
        iMessageProvider.send(message);
        return "success";
    }
}

訊息接收端:

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基礎配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
server:
  port: 8803


spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要繫結的rabbitmq的服務資訊
        defaultRabbit:
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain”
          binder: defaultRabbit # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    prefer-ip-address: true     # 訪問的路徑變為IP地址
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        String payload = message.getPayload();
        log.info("消費者 接受資訊------->" + payload);
    }
}
@SpringBootApplication
@Slf4j
@EnableEurekaClient
public class StreamConsumerMain8803 {
    public static void main(String[] args) {
        SpringApplication.run( StreamConsumerMain8803.class,args);
        log.info("****************StreamConsumerMain8803啟動 ************ ");
    }
}