譯:基於Spring Cloud Stream構建和測試 message-driven 微服務
阿新 • • 發佈:2018-07-14
sage rabbit 下一步 總結 sting partition for 入參 tde 原文鏈接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/
作者: Piotr Mińkowski
譯者: helloworldtang
img
Spring Boot和Spring Cloud為您提供了一個利用不同的通信方式快速構建微服務的解決方案。您可以基於Spring Cloud Netflix庫創建同步REST微服務,正如我在之前的一篇文章中所展示的那樣 使用Spring Boot 2.0, Eureka and Spring Cloud快速搭建微服務指南。您可以使用Spring WebFlux項目在Netty上創建異步的、響應式的微服務,並將其與一些Spring Cloud庫相結合,如我的文章所示 使用Spring WebFlux and Spring Cloud搭建響應式微服務。最後,您可以使用Spring Cloud Stream和類似Apache Kafka或RabbitMQ這樣的broker來實現基於發布/訂閱模型的message-driven微服務。構建微服務的最後一種方法是本文的主要主題。我將向您展示如何在RabbitMQ broker的基礎上有效地構建、擴展、運行和測試消息傳遞微服務。
體系結構
為了演示Spring Cloud Stream的特性,我們將設計一個示例系統,該系統使用發布/訂閱模型進行跨服務通信。我們有三個微服務:order-service、product-service和account-service。應用程序order-service暴露了負責處理發送到我們系統的訂單的HTTP endpoint。所有傳入的訂單都是異步處理的——order-service準備並發送消息到RabbitMQ exchange,然後就對調用的客戶端進行響應,不需要等到消息被消費後再響應。應用程序的account-service和product-service正在偵聽進入該RabbitMQ exchange的訂單消息。微服務account-service負責檢查客戶賬戶是否有足夠的資金來支付該訂單需要的金額,如果有就從該賬戶扣款。微服務product-service檢查是否有足夠的庫存,並在處理訂單後改變可用產品的數量。account-service 和 product-service 都通過RabbitMQ exchange(這一次是使用direct exchange的一對一通信)發送帶有操作狀態的異步響應。微服務 order-service根據接收到的響應消息來更新訂單狀態,並通過REST endpoint GET /order/{id}提供給外部客戶端。
如果您覺得我們的示例描述有點難以理解,這裏有一個用於澄清的架構圖。
stream-1
啟用 Spring Cloud Stream
在項目中使用Spring Cloud Stream的推薦方法是使用依賴管理系統。Spring Cloud Stream有一個與整個Spring Cloud framework相關,並且獨立發布的依賴管理。然而,如果我們已經在Elmhurst.RELEASE版本的dependencyManagement部分聲明了spring-cloud-dependencies,就不需要在pom.xml中聲明任何其他內容。如果您喜歡只使用Spring Cloud Stream項目,那麽您應該定義以下部分。
下一步是將spring-cloud-streamartifact添加到項目依賴項中。我還建議您至少包括spring-cloud-sleuth 庫,以提供作為源請求進入order-service 的發送消息用的traceId。
Spring Cloud Stream 編程模型
為了使您的應用程序能夠連接到一個message broker,請在主類上使用@EnableBinding註解。 @EnableBinding註解將一個或多個接口作為參數。您可以在Spring Cloud Stream提供的三個接口之間進行選擇:
Sink:這是用來標記從入站通道接收消息的服務。
Source: 這是用來向出站通道發送消息的。
Processor:當你需要一個入站通道和一個出站通道時,它可以被使用,因為它繼承了Source and Sink接口。因為order-service發送消息,並接收它們,它的主類已經使用了@EnableBinding(Processor.class)註解。
下面是order-service項目中啟用了Spring Cloud Stream binding的主類。
@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
...
public static void main(String[] args) {
new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
}
...
}
增加 message broker
在Spring Cloud Stream術語中,負責與特定message broker集成的實現稱為binder。默認情況下,Spring Cloud Stream為 Kafka and RabbitMQ提供了binder實現。它能夠自動檢測和在類路徑上查找binder。任何特定於中間件的設置都可以通過Spring Boot支持的外部配置屬性來覆蓋,譬如應用程序參數、環境變量,或者僅僅是application.yml文件。為了包含對RabbitMQ的支持,RabbitMQ將這篇文章用作message broker,您應該向項目添加以下依賴項。
現在,我們的應用程序需要連接RabbitMQ broker的一個共享實例。這就是為什麽我使用RabbitMQ在默認的5672端口上運行Docker鏡像。它還可以在地址http://192.168.99.100:15672(http://192.168.99.100:15672/)下啟動web儀表板。
我們需要通過設置屬性 spring.rabbitmq.host為Docker機器IP 192.168.99.100 ,來覆蓋Spring Boot application的中的默認設置。
實現消息驅動的微服務
Spring Cloud Stream是在Spring Integration項目之上構建的。Spring Integration擴展了Spring編程模型,以支持眾所周知的企業集成模式(EIP)。EIP定義了許多在分布式系統中經常使用的經典組件。您可能已經聽說過諸如消息通道、路由器、聚合器或endpoints之類的模式。讓我們回到上面的例子。讓我們從order-service開始,它負責接收訂單,將它們發布在shared topic上,然後從下遊服務收集異步響應。下面是@service,它使用Sourcebean來構建消息並將其發布到遠程topic。
這個 @Service 是由controller調用,controller暴露提交新訂單和通過 id獲得訂單狀態的HTTP endpoints。
現在,讓我們更仔細地看看消費端。來自order-service的OrderSender bean所發送的消息是由 account-service和product-service接收。為了從 topic exchange中接收消息,我們只需要在入參為Order的方法上添加 @StreamListener註解。我們還必須為監聽器定義目標通道——在這種情況下,它是Processor.INPUT。譬如:
@StreamListener(Processor.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
service.process(order);
}
接收訂單由AccountServicebean處理。account-service會根據客戶賬戶上是否有足夠的資金來實現訂單接受或拒絕訂單。驗收狀態的響應通過OrderSenderbean調用的輸出通道發回order-service 。
最後一步是配置。它是在 application.yml中提供的。我們必須正確地定義通道的destination。而order-service則將orders-outdestination分配給輸出通道,而orders-indestination則是輸入通道,account-service和 product-service則恰恰相反。這是合乎邏輯的,因為通過其輸出destination通過 order-service發送的消息是通過其輸入destination接收的服務接收的。但在shared broker’s exchange中,它仍然是相同的destination。下面是 order-service的配置設置。
spring:
application:
name: order-service
rabbitmq:
host: 192.168.99.100
port: 5672
cloud:
stream:
bindings:
output:
destination: orders-out
producer:
partitionKeyExpression: payload.customerId
partitionCount: 2
input:
destination: orders-in
rabbit:
bindings:
input:
consumer:
exchangeType: direct
這是為 account-service和product-service提供的配置。
最後,您可以運行上面示例中的微服務。現在,我們只需要運行每個微服務的單個實例。您可以通過運行JUnit測試類OrderControllerTest來輕松地生成一些測試請求,這是在我的源代碼庫中提供的 order-service中提供的。這種情況下很簡單。在下一篇文章中,我們將學習更高級的示例,其中包含多個正在運行的消費服務實例。
擴展
為了擴展我們的Spring Cloud Stream應用程序,我們只需要啟動每個微服務的附加實例。他們仍然會偵聽與當前正在運行的實例相同的 topic exchange 中的傳入消息。在添加了一個 account-service和 product-service的實例之後,我們可以發送一個測試訂單。這個測試的結果對我們來說是不令人滿意的… 為什麽?每個微服務運行的所有實例都接收到了這個訂單。這正是 topic exchanges 的工作方式——發送到topic的消息被所有的消費者接收,他們正在偵聽這個topic。幸運的是,Spring Cloud Stream能夠通過提供稱為 consumer group的解決方案來解決這個問題。它負責保證一個消息只被一個實例處理,如果它們被放置在一個相互競爭的消費者關系中。在運行多項服務實例時,對consumer group機制的轉換已經在下圖中可視化了。
stream-2
一個 consumer group 機制的配置不是很困難。我們只需要設定group參數,並給出給定destination的組名。下面是account-service的當前binding配置。orders-indestination地是一個為直接與order-service通信而創建的隊列,因此只有orders-out被分組使用spring.cloud.stream.bindings..group屬性。
Consumer group機制是Apache Kafka的一個概念,它也在Spring Cloud Stream中實現,也適用於RabbitMQ broker,它本身並不支持它。因此,我認為它在RabbitMQ上的配置非常有趣。如果您在destination運行兩個服務實例,而沒有在destination設置組名,那麽就會有兩個為單個交易所創建的bindings(每個實例一個bindings),如下圖所示。因為有兩個應用程序在這個exchange中監聽,總共有四個binding分配給那個exchange。
stream-3
如果您為選定的destination Spring Cloud Stream設置組名,則將為給定服務的所有運行實例創建單一binding。binding的名稱將以組名為後綴。
B08597_11_06
因為,我們已經在項目依賴項中包含了 spring-cloud-starter-sleuth ,在實現 order-service POST endpoint的單個請求時,在交換的所有異步請求之間發送相同的 traceId 頭部。由於這個原因,我們可以使用Elastic Stack (Kibana)輕松地將所有日誌關聯起來。
B08597_11_05
自動化測試
您可以輕松地測試您的微服務,而不需要連接到message broker。要實現它,您需要將 spring-cloud-stream-test-support包含到您的項目依賴項中。它包含 TestSupportBinderbean,它允許您與綁定通道進行交互,並檢查應用程序發送和接收的任何消息。
在測試類中,我們需要聲明 MessageCollectorbean,它負責接收由TestSupportBinder保留的消息。這是我的account-service測試類。使用Processorbean,我將測試訂單發送到輸入通道。然後,MessageCollector接收到通過輸出通道發送回 order-service 的消息。測試方法的 testAccepted創建了應該被帳戶服務接受的順序,而testRejected方法則設置了過高的訂單價格,從而導致拒絕訂單。
總結
當您不需要來自API的同步響應時,Message-driven的微服務是一個不錯的選擇。在本文中,我展示了在您的微服務之間的跨服務通信中發布/訂閱模型的示例用例。源代碼在GitHub上是常見的(https://github.com/helloworldtang/sample-message-driven-microservices.git【原文源碼maven不能運行,這個項目fork原代碼並修復了錯誤】)。對於使用Spring Cloud Stream庫、Apache Kafka的更有趣的例子,您可以參考我的書中第11章, Mastering Spring Cloud(https://www.packtpub.com/application-development/mastering-spring-cloud)。
關註社區公號,加入社區純技術微信群
譯:基於Spring Cloud Stream構建和測試 message-driven 微服務