1. 程式人生 > >譯:基於Spring Cloud Stream構建和測試 message-driven 微服務

譯:基於Spring Cloud Stream構建和測試 message-driven 微服務

sage rabbit 下一步 總結 sting partition for 入參 tde

原文鏈接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ 作者: Piotr Mińkowski 譯者: helloworldtang img Spring Boot和Spring Cloud為您提供了一個利用不同的通信方式快速構建微服務的解決方案。您可以基於Spring Cloud Netflix庫創建同步REST微服務,正如我在之前的一篇文章中所展示的那樣 使用Spring Boot 2.0, Eureka and Spring Cloud快速搭建微服務指南。您可以使用Spring WebFlux項目在Netty上創建異步的、響應式的微服務,並將其與一些Spring Cloud庫相結合,如我的文章所示 使用Spring WebFlux and Spring Cloud搭建響應式微服務。最後,您可以使用Spring Cloud Stream和類似Apache Kafka或RabbitMQ這樣的broker來實現基於發布/訂閱模型的message-driven微服務。構建微服務的最後一種方法是本文的主要主題。我將向您展示如何在RabbitMQ broker的基礎上有效地構建、擴展、運行和測試消息傳遞微服務。 體系結構 為了演示Spring Cloud Stream的特性,我們將設計一個示例系統,該系統使用發布/訂閱模型進行跨服務通信。我們有三個微服務:order-service、product-service和account-service。應用程序order-service暴露了負責處理發送到我們系統的訂單的HTTP endpoint。所有傳入的訂單都是異步處理的——order-service準備並發送消息到RabbitMQ exchange,然後就對調用的客戶端進行響應,不需要等到消息被消費後再響應。應用程序的account-service和product-service正在偵聽進入該RabbitMQ exchange的訂單消息。微服務account-service負責檢查客戶賬戶是否有足夠的資金來支付該訂單需要的金額,如果有就從該賬戶扣款。微服務product-service檢查是否有足夠的庫存,並在處理訂單後改變可用產品的數量。account-service 和 product-service 都通過RabbitMQ exchange(這一次是使用direct exchange的一對一通信)發送帶有操作狀態的異步響應。微服務 order-service根據接收到的響應消息來更新訂單狀態,並通過REST endpoint GET /order/{id}提供給外部客戶端。 如果您覺得我們的示例描述有點難以理解,這裏有一個用於澄清的架構圖。 stream-1 啟用 Spring Cloud Stream 在項目中使用Spring Cloud Stream的推薦方法是使用依賴管理系統。Spring Cloud Stream有一個與整個Spring Cloud framework相關,並且獨立發布的依賴管理。然而,如果我們已經在Elmhurst.RELEASE版本的dependencyManagement部分聲明了spring-cloud-dependencies,就不需要在pom.xml中聲明任何其他內容。如果您喜歡只使用Spring Cloud Stream項目,那麽您應該定義以下部分。 下一步是將spring-cloud-streamartifact添加到項目依賴項中。我還建議您至少包括spring-cloud-sleuth 庫,以提供作為源請求進入order-service 的發送消息用的traceId。 Spring Cloud Stream 編程模型 為了使您的應用程序能夠連接到一個message broker,請在主類上使用@EnableBinding註解。 @EnableBinding註解將一個或多個接口作為參數。您可以在Spring Cloud Stream提供的三個接口之間進行選擇: Sink:這是用來標記從入站通道接收消息的服務。 Source: 這是用來向出站通道發送消息的。 Processor:當你需要一個入站通道和一個出站通道時,它可以被使用,因為它繼承了Source and Sink接口。因為order-service發送消息,並接收它們,它的主類已經使用了@EnableBinding(Processor.class)註解。 下面是order-service項目中啟用了Spring Cloud Stream binding的主類。 @SpringBootApplication @EnableBinding(Processor.class) public class OrderApplication { ... public static void main(String[] args) { new SpringApplicationBuilder(OrderApplication.class).web(true).run(args); } ... } 增加 message broker 在Spring Cloud Stream術語中,負責與特定message broker集成的實現稱為binder。默認情況下,Spring Cloud Stream為 Kafka and RabbitMQ提供了binder實現。它能夠自動檢測和在類路徑上查找binder。任何特定於中間件的設置都可以通過Spring Boot支持的外部配置屬性來覆蓋,譬如應用程序參數、環境變量,或者僅僅是application.yml文件。為了包含對RabbitMQ的支持,RabbitMQ將這篇文章用作message broker,您應該向項目添加以下依賴項。 現在,我們的應用程序需要連接RabbitMQ broker的一個共享實例。這就是為什麽我使用RabbitMQ在默認的5672端口上運行Docker鏡像。它還可以在地址http://192.168.99.100:15672(http://192.168.99.100:15672/)下啟動web儀表板。 我們需要通過設置屬性 spring.rabbitmq.host為Docker機器IP 192.168.99.100 ,來覆蓋Spring Boot application的中的默認設置。 實現消息驅動的微服務 Spring Cloud Stream是在Spring Integration項目之上構建的。Spring Integration擴展了Spring編程模型,以支持眾所周知的企業集成模式(EIP)。EIP定義了許多在分布式系統中經常使用的經典組件。您可能已經聽說過諸如消息通道、路由器、聚合器或endpoints之類的模式。讓我們回到上面的例子。讓我們從order-service開始,它負責接收訂單,將它們發布在shared topic上,然後從下遊服務收集異步響應。下面是@service,它使用Sourcebean來構建消息並將其發布到遠程topic。 這個 @Service 是由controller調用,controller暴露提交新訂單和通過 id獲得訂單狀態的HTTP endpoints。 現在,讓我們更仔細地看看消費端。來自order-service的OrderSender bean所發送的消息是由 account-service和product-service接收。為了從 topic exchange中接收消息,我們只需要在入參為Order的方法上添加 @StreamListener註解。我們還必須為監聽器定義目標通道——在這種情況下,它是Processor.INPUT。譬如: @StreamListener(Processor.INPUT) public void receiveOrder(Order order) throws JsonProcessingException { LOGGER.info("Order received: {}", mapper.writeValueAsString(order)); service.process(order); } 接收訂單由AccountServicebean處理。account-service會根據客戶賬戶上是否有足夠的資金來實現訂單接受或拒絕訂單。驗收狀態的響應通過OrderSenderbean調用的輸出通道發回order-service 。 最後一步是配置。它是在 application.yml中提供的。我們必須正確地定義通道的destination。而order-service則將orders-outdestination分配給輸出通道,而orders-indestination則是輸入通道,account-service和 product-service則恰恰相反。這是合乎邏輯的,因為通過其輸出destination通過 order-service發送的消息是通過其輸入destination接收的服務接收的。但在shared broker’s exchange中,它仍然是相同的destination。下面是 order-service的配置設置。 spring: application: name: order-service rabbitmq: host: 192.168.99.100 port: 5672 cloud: stream: bindings: output: destination: orders-out producer: partitionKeyExpression: payload.customerId partitionCount: 2 input: destination: orders-in rabbit: bindings: input: consumer: exchangeType: direct 這是為 account-service和product-service提供的配置。 最後,您可以運行上面示例中的微服務。現在,我們只需要運行每個微服務的單個實例。您可以通過運行JUnit測試類OrderControllerTest來輕松地生成一些測試請求,這是在我的源代碼庫中提供的 order-service中提供的。這種情況下很簡單。在下一篇文章中,我們將學習更高級的示例,其中包含多個正在運行的消費服務實例。 擴展 為了擴展我們的Spring Cloud Stream應用程序,我們只需要啟動每個微服務的附加實例。他們仍然會偵聽與當前正在運行的實例相同的 topic exchange 中的傳入消息。在添加了一個 account-service和 product-service的實例之後,我們可以發送一個測試訂單。這個測試的結果對我們來說是不令人滿意的… 為什麽?每個微服務運行的所有實例都接收到了這個訂單。這正是 topic exchanges 的工作方式——發送到topic的消息被所有的消費者接收,他們正在偵聽這個topic。幸運的是,Spring Cloud Stream能夠通過提供稱為 consumer group的解決方案來解決這個問題。它負責保證一個消息只被一個實例處理,如果它們被放置在一個相互競爭的消費者關系中。在運行多項服務實例時,對consumer group機制的轉換已經在下圖中可視化了。 stream-2 一個 consumer group 機制的配置不是很困難。我們只需要設定group參數,並給出給定destination的組名。下面是account-service的當前binding配置。orders-indestination地是一個為直接與order-service通信而創建的隊列,因此只有orders-out被分組使用spring.cloud.stream.bindings..group屬性。 Consumer group機制是Apache Kafka的一個概念,它也在Spring Cloud Stream中實現,也適用於RabbitMQ broker,它本身並不支持它。因此,我認為它在RabbitMQ上的配置非常有趣。如果您在destination運行兩個服務實例,而沒有在destination設置組名,那麽就會有兩個為單個交易所創建的bindings(每個實例一個bindings),如下圖所示。因為有兩個應用程序在這個exchange中監聽,總共有四個binding分配給那個exchange。 stream-3 如果您為選定的destination Spring Cloud Stream設置組名,則將為給定服務的所有運行實例創建單一binding。binding的名稱將以組名為後綴。 B08597_11_06 因為,我們已經在項目依賴項中包含了 spring-cloud-starter-sleuth ,在實現 order-service POST endpoint的單個請求時,在交換的所有異步請求之間發送相同的 traceId 頭部。由於這個原因,我們可以使用Elastic Stack (Kibana)輕松地將所有日誌關聯起來。 B08597_11_05 自動化測試 您可以輕松地測試您的微服務,而不需要連接到message broker。要實現它,您需要將 spring-cloud-stream-test-support包含到您的項目依賴項中。它包含 TestSupportBinderbean,它允許您與綁定通道進行交互,並檢查應用程序發送和接收的任何消息。 在測試類中,我們需要聲明 MessageCollectorbean,它負責接收由TestSupportBinder保留的消息。這是我的account-service測試類。使用Processorbean,我將測試訂單發送到輸入通道。然後,MessageCollector接收到通過輸出通道發送回 order-service 的消息。測試方法的 testAccepted創建了應該被帳戶服務接受的順序,而testRejected方法則設置了過高的訂單價格,從而導致拒絕訂單。 總結 當您不需要來自API的同步響應時,Message-driven的微服務是一個不錯的選擇。在本文中,我展示了在您的微服務之間的跨服務通信中發布/訂閱模型的示例用例。源代碼在GitHub上是常見的(https://github.com/helloworldtang/sample-message-driven-microservices.git【原文源碼maven不能運行,這個項目fork原代碼並修復了錯誤】)。對於使用Spring Cloud Stream庫、Apache Kafka的更有趣的例子,您可以參考我的書中第11章, Mastering Spring Cloud(https://www.packtpub.com/application-development/mastering-spring-cloud)。 關註社區公號,加入社區純技術微信群

譯:基於Spring Cloud Stream構建和測試 message-driven 微服務