Spring Cloud Stream 訊息驅動
阿新 • • 發佈:2020-08-28
遮蔽底層訊息中介軟體的差異,降低切換成本 , 統一訊息的程式設計模型。
通過定義繫結器Binder 作為中介軟體。 實現應用程式與訊息中介軟體的細節之間的隔離。
訊息傳送端:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
主要是這個
訊息傳送實現類:
@EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel messageChannel; @Override public void send(String message) { Message<String> message1 = MessageBuilder.withPayload(message).build(); messageChannel.send(message1); } }
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊
defaultRabbit:
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
prefer-ip-address: true # 訪問的路徑變為IP地址
public interface IMessageProvider { void send(String message); }
@RestController @Slf4j public class SendMessageController { @Autowired private IMessageProvider iMessageProvider; @GetMapping("send") public String sendmessage(String message){ iMessageProvider.send(message); return "success"; } }
訊息接收端:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
server:
port: 8803
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊
defaultRabbit:
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
prefer-ip-address: true # 訪問的路徑變為IP地址
@Slf4j @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @StreamListener(Sink.INPUT) public void input(Message<String> message){ String payload = message.getPayload(); log.info("消費者 接受資訊------->" + payload); } }
@SpringBootApplication @Slf4j @EnableEurekaClient public class StreamConsumerMain8803 { public static void main(String[] args) { SpringApplication.run( StreamConsumerMain8803.class,args); log.info("****************StreamConsumerMain8803啟動 ************ "); } }