Spring Cloud:Stream基礎知識
阿新 • • 發佈:2020-09-07
背景
訊息中介軟體有多種,rabbitmq,rocketmq,activemq,kafka等。
不同的訊息中介軟體具體細節不一樣。那麼有沒有一種新的技術誕生,讓我們不再關注具體MQ細節,我們只需要用一種適配繫結的方式,自動給我們在各種MQ內切換。
一句話:遮蔽底層訊息中介軟體的差異,降低切換成本,統一訊息的程式設計模型。Spring Cloud Stream 因此誕生。
官方定義 Spring Cloud Stream是一個構建訊息驅動微服務的框架。
應用程式使用inputs或者outputs來與springcloud stream中binder互動。
通過我們配置來bingding(繫結),而stream的binder物件負責與訊息中介軟體互動。
所以,我們只需要搞清楚如何與springcloud stream互動就可以方便使用訊息驅動方式。
SpringCloud Stream為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用釋出-訂閱、消費組、分割槽三個核心概念。
目前僅支援RabbitMQ和Kafka。
Stream處理架構:
通過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體之間的隔離。
stream標準流程套路:
Binder:很方便的連線中介軟體,遮蔽差異。
channel:通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,通過channel對佇列進行配置。
source和sink:簡單理解為訊息的輸入輸出。
編碼API和註解:
Stream訊息生產者
pom依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
yml配置
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: bindings: output: destination: studyExchange content-type: application/json binder: defaultRabbit binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 192.168.10.132 port: 5672 username: guest password: guest eureka: client: #是否將自己註冊到Eureka Server 預設為true register-with-eureka: true #是否從EurekaServer抓取已有的註冊資訊,預設為true,單節點無所謂,叢集必須設定true才能配合ribbon做負載均衡 fetch-registry: true service-url: #設定eureka server互動的地址查詢服務和註冊服務都需要依賴這個地址 defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: send-8801.com prefer-ip-address: true
傳送訊息的介面:
public interface IMessageProvider {
public String send();
}
傳送訊息的實現:
@EnableBinding(Source.class)//定義訊息的推送管道
public class MessageProvider implements IMessageProvider {
@Autowired
private MessageChannel output;
@Override
public String send() {
String serial = IdUtil.simpleUUID();
System.out.println(serial+"============");
output.send(MessageBuilder.withPayload(serial).build());
return serial;
}
}
controller
@RestController
public class SendController {
@Autowired
private IMessageProvider messageProvider;
@GetMapping("sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
呼叫介面,觀察效果
Stream訊息消費者
配置yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.10.132
port: 5672
username: guest
password: guest
eureka:
client:
#是否將自己註冊到Eureka Server 預設為true
register-with-eureka: true
#是否從EurekaServer抓取已有的註冊資訊,預設為true,單節點無所謂,叢集必須設定true才能配合ribbon做負載均衡
fetch-registry: true
service-url:
#設定eureka server互動的地址查詢服務和註冊服務都需要依賴這個地址
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: receive-8802.com
prefer-ip-address: true
接收訊息:
@EnableBinding(Sink.class)
@RestController
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(value = Sink.INPUT)
public void input(Message<String> message){
System.out.println("訊息:"+message.getPayload()+"serverPort:"+serverPort);
}
}
stream重複消費
下預設配置中,多個消費者存在,會存在重複消費問題
原因:預設分組group是不同的,組流水號不一樣,被認為是不同組,可以消費,所以要自定義配置分組。
yml配置:
cloud:
stream:
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: wen.jie
通過配置後,兩個消費者被分配到一組,就不存在重複消費的問題。