Spring Cloud Stream如何實現服務之間的通訊
Spring Cloud Stream
Srping cloud Bus的底層實現就是Spring Cloud Stream,Spring Cloud Stream的目的是用於構建基於訊息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模組進行封裝(整合)和擴充套件,下面我們實現兩個服務之間的通訊來演示Spring Cloud Stream的使用方法。
整體概述
服務要想與其他服務通訊要定義通道,一般會定義輸出通道和輸入通道,輸出通道用於傳送訊息,輸入通道用於接收訊息,每個通道都會有個名字(輸入和輸出只是通道型別,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會報錯(輸入通道和輸出通道不同型別的通道名稱也不能相同),繫結器是操作RabbitMQ或Kafka的抽象層,為了遮蔽操作這些訊息中介軟體的複雜性和不一致性,繫結器會用通道的名字在訊息中介軟體中定義主題,一個主題內的訊息生產者來自多個服務,一個主題內訊息的消費者也是多個服務,也就是說訊息的釋出和消費是通過主題進行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實現,在Kafka中主題使用Topic實現。
準備環境
建立兩個專案spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實現通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模組Spring Integration實現通訊。
兩個專案的POM檔案依賴都是:
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies>
spring-cloud-stream-binder-rabbit是指繫結器的實現使用RabbitMQ。
專案配置內容application.properties:
spring.application.name=spring-cloud-stream-a server.port=9010 #設定預設繫結器 spring.cloud.stream.defaultBinder = rabbit spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b server.port=9011 #設定預設繫結器 spring.cloud.stream.defaultBinder = rabbit spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
啟動一個rabbitmq:
docker pull rabbitmq:3-management docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
編寫A專案程式碼
在A專案中定義一個輸入通道一個輸出通道,定義通道在介面中使用@Input和@Output註解定義,程式啟動的時候Spring Cloud Stream會根據介面定義將實現類自動注入(Spring Cloud Stream自動實現該介面不需要寫程式碼)。
A服務輸入通道,通道名稱ChatExchanges.A.Input,介面定義輸入通道必須返回SubscribableChannel:
public interface ChatInput { String INPUT = "ChatExchanges.A.Input"; @Input(ChatInput.INPUT) SubscribableChannel input(); }
A服務輸出通道,通道名稱ChatExchanges.A.Output,輸出通道必須返回MessageChannel:
public interface ChatOutput { String OUTPUT = "ChatExchanges.A.Output"; @Output(ChatOutput.OUTPUT) MessageChannel output(); }
定義訊息實體類:
public class ChatMessage implements Serializable { private String name; private String message; private Date chatDate; //沒有無引數的建構函式並行化會出錯 private ChatMessage(){} public ChatMessage(String name,String message,Date chatDate){ this.name = name; this.message = message; this.chatDate = chatDate; } public String getName(){ return this.name; } public String getMessage(){ return this.message; } public Date getChatDate() { return this.chatDate; } public String ShowMessage(){ return String.format("聊天訊息:%s的時候,%s說%s。",this.chatDate,this.name,this.message); } }
在業務處理類上用@EnableBinding註解繫結輸入通道和輸出通道,這個繫結動作其實就是建立並註冊輸入和輸出通道的實現類到Bean中,所以可以直接是使用@Autowired進行注入使用,另外訊息的序列化預設使用application/json格式(com.fastexml.jackson),最後用@StreamListener註解進行指定通道訊息的監聽:
//ChatInput.class的輸入通道不在這裡繫結,監聽到資料會找不到AClient類的引用。 //Input和Output通道定義的名字不能一樣,否則程式啟動會拋異常。 @EnableBinding({ChatOutput.class,ChatInput.class}) public class AClient { private static Logger logger = LoggerFactory.getLogger(AClient.class); @Autowired private ChatOutput chatOutput; //StreamListener自帶了Json轉物件的能力,收到B的訊息列印並回復B一個新的訊息。 @StreamListener(ChatInput.INPUT) public void PrintInput(ChatMessage message) { logger.info(message.ShowMessage()); ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date()); chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build()); } }
到此A專案程式碼編寫完成。
編寫B專案程式碼
B專案使用Spring Integration實現訊息的釋出和消費,定義通道時我們要交換輸入通道和輸出通道的名稱:
public interface ChatProcessor { String OUTPUT = "ChatExchanges.A.Input"; String INPUT = "ChatExchanges.A.Output"; @Input(ChatProcessor.INPUT) SubscribableChannel input(); @Output(ChatProcessor.OUTPUT) MessageChannel output(); }
訊息實體類:
public class ChatMessage { private String name; private String message; private Date chatDate; //沒有無引數的建構函式並行化會出錯 private ChatMessage(){} public ChatMessage(String name,String message,Date chatDate){ this.name = name; this.message = message; this.chatDate = chatDate; } public String getName(){ return this.name; } public String getMessage(){ return this.message; } public Date getChatDate() { return this.chatDate; } public String ShowMessage(){ return String.format("聊天訊息:%s的時候,%s說%s。",this.chatDate,this.name,this.message); } }
業務處理類用@ServiceActivator註解代替@StreamListener,用@InboundChannelAdapter註解釋出訊息:
@EnableBinding(ChatProcessor.class) public class BClient { private static Logger logger = LoggerFactory.getLogger(BClient.class); //@ServiceActivator沒有Json轉物件的能力需要藉助@Transformer註解 @ServiceActivator(inputChannel=ChatProcessor.INPUT) public void PrintInput(ChatMessage message) { logger.info(message.ShowMessage()); } @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT) public ChatMessage transform(String message) throws Exception{ ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(message,ChatMessage.class); } //每秒發出一個訊息給A @Bean @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000")) public GenericMessage<ChatMessage> SendChatMessage(){ ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date()); GenericMessage<ChatMessage> gm = new GenericMessage<>(message); return gm; } }
執行程式
啟動A專案和B專案:
原始碼
Github倉庫:https://github.com/sunweisheng/spring-cloud-example
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援碼農教程。