Spring Cloud Stream + RabbitMQ 訊息生成和訊息消費
在本 DEMO中有兩個節點互為訊息的生產者和訊息消費者。
一、節點1
1. pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sande</groupId> <artifactId>stream-hello</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>stream-hello</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Finchley.SR1</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web-services</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2. src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=input
spring.cloud.stream.bindings.output.destination=output
server.port=8080
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
配置了 spring.cloud.stream.bindings.input.destination=input、spring.cloud.stream.bindings.output.destination=output 後會在RabbitMQ 中建立一個名為 input 交換器(exchange)和一個名為 output 交換器(exchange)。spring.cloud.stream.bindings.input.destination=input 的意思是把 spring cloud stream 的輸入通道繫結到 RabbitMQ 的 input 交換器,spring.cloud.stream.bindings.output.destination=output 的意思是把 spring cloud stream 的訊息輸出通道繫結到 RabbitMQ 的 output 交換器。
3.訊息生產類
package com.sande.streamhello.ramp; import java.text.SimpleDateFormat; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; @EnableBinding(value= {Source.class}) public class SinkSender { private static Logger logger = LoggerFactory.getLogger(SinkSender.class); //@StreamListener(Source.OUTPUT) private String format="yyyy-mm-dd HH:mm:ss"; @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1")) public MessageSource<String> timerMessageSource() { System.out.println("MessageSource"); //return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date())); //return () -> new GenericMessage<>("wo ai ni wo de jia"); return () -> new GenericMessage<>("{\"name\":\"didi\",\"age\":30}"); } }
4.訊息消費類
package com.sande.streamhello.ramp;
import java.util.Date;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import com.sande.streamhello.StreamHelloApplication;
@EnableBinding(value= {Sink.class})
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);
@StreamListener(Sink.INPUT)
public void receive(String payload) {
logger.info("Received:" + payload);
}
}
5. 應用主類
package com.sande.streamhello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamHelloApplication {
public static void main(String[] args) {
SpringApplication.run(StreamHelloApplication.class, args);
}
}
二、節點2
1. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sande</groupId>
<artifactId>stream-hello3</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>stream-hello3</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web-services</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=output
spring.cloud.stream.bindings.output.destination=input
server.port=8082
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
配置了 spring.cloud.stream.bindings.input.destination=out、spring.cloud.stream.bindings.output.destination=input 後會在RabbitMQ 中建立一個名為 input 交換器(exchange)和一個名為 output 交換器(exchange)。spring.cloud.stream.bindings.input.destination=output 的意思是把 spring cloud stream 的輸入通道繫結到 RabbitMQ 的 output 交換器,spring.cloud.stream.bindings.output.destination=input 的意思是把 spring cloud stream 的訊息輸出通道繫結到 RabbitMQ 的 input 交換器。這樣節點二的輸入通道對應節點一的輸出通道,節點二的輸出通道對應節點一的輸入通道,就可以互相作為對方的消費端和生成端。
3.訊息生產類
package com.sande.streamhello.ramp;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
@EnableBinding(value = {Source.class})
public class SinkSender2 {
private static Logger logger = LoggerFactory.getLogger(SinkSender2.class);
@Bean
@InboundChannelAdapter(value=Source.OUTPUT,[email protected](fixedDelay="2000",
maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource2() {
return () -> new GenericMessage<>(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
/*@Transformer(inputChannel=Source.OUTPUT,outputChannel=Sink.INPUT)
public Object transform(Date mesage) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(mesage);
}*/
}
4. 訊息消費類
package com.sande.streamhello.ramp;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import com.sande.streamhello.StreamHelloApplication;
@EnableBinding(value= {Sink.class})
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);
@StreamListener(Sink.INPUT)
public void receive(String payload) {
logger.info("Received:" + payload);
}
}
5. 應用主類
package com.sande.streamhello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamHelloApplication {
public static void main(String[] args) {
SpringApplication.run(StreamHelloApplication.class, args);
}
}
三、測試
1. 啟動 RabbitMQ
2. 啟動節點1
3.啟動節點2
4. 在 RabbitMQ 建立了 input 和 output 交換器
下面是對應的訊息佇列
5. 在節點一和節點2的控制檯可以看到下面的訊息輸出
節點1:
節點2: