Spring Cloud學習筆記28——訊息驅動的微服務:Spring Cloud Stream
Spring Cloud Stream
是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot
來建立獨立的、可用於生產的Spring
應用程式。它通過使用Spring Integration
來連線訊息代理中介軟體以實現訊息事件驅動。Spring Cloud Stream
為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,並且引入了釋出-訂閱、消費組以及分割槽這三個核心概念。
簡單地說,Spring Cloud Stream
本質上就是整合了Spring Boot
和Spring Integration
,實現了一套輕量級的訊息驅動的微服務框架。通過使用Spring Cloud Stream
由於Spring Cloud Stream
基於Spring Boot
實現,所以它秉承了Spring Boot
的優點,自動化配置的功能可幫助我們快速上手使用,但是到目前為止,Spring Cloud Stream
只支援RabbitMQ
和Kafka
這兩個著名的訊息中介軟體的自動化配置。
核心概念
Spring Cloud Stream
構建的應用程式與訊息中介軟體之間時通過繫結器Binder
相關聯的,繫結器對於應用程式而言起到了隔離作用,它使得不同訊息中介軟體的實現細節對應用程式來說是透明的。所以對於每一個Spring Cloud Stream
Binder
對應程式提供的抽象概念來使用訊息中介軟體來實現業務邏輯即可,而這個抽象概念就是訊息通道Channel
。
繫結器
Binder
繫結器是Spring Cloud Stream
中一個非常重要的概念。在沒有繫結器這個概念的情況下,我們的Spring Boot
應用要直接與訊息中介軟體進行資訊互動的時候,由於各訊息中介軟體構建的初衷不同,它們的實現細節上會有較大的差異性,這使得我們實現的訊息互動邏輯就會非常笨重,因為對具體的中介軟體實現細節有太重的依賴,當中間件有較大的變動升級、或是更換中介軟體的時候,我們就需要付出非常大的代價來實施。
通過定義繫結器作為中間層,完美地實現了應用程式與訊息中介軟體細節之間的隔離。通過嚮應用程式暴露統一的Channel
通道,使得應用程式不需要再考慮各種不同的訊息中介軟體實現。當我們需要升級訊息中介軟體,或是更換其他訊息中介軟體產品時,我們要做的就是更換它們對應的Binder
繫結器而不需要修改任何Spring Boot
的應用邏輯。
釋出-訂閱模式
在Spring Cloud Stream
中的訊息通訊方式遵循了釋出-訂閱模式,當一條訊息被投遞到訊息中介軟體之後,它會通過共享的Topic
主題進行廣播,訊息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處理。這裡所提到的Topic
主題是Spring Cloud Stream
中的一個抽象概念,用來代表釋出共享訊息給消費者的地方。在不同的訊息中介軟體中,Topic
可能對應著不同的概念,比如:在RabbitMQ
中的它對應了Exchange
、而在Kafka
中則對應了Kafka
中的Topic
。
相對於點對點佇列實現的訊息通訊來說,Spring Cloud Stream
採用的釋出-訂閱模式可以有效的降低訊息生產者與消費者之間的耦合,當我們需要對同一類訊息增加一種處理方式時,只需要增加一個應用程式並將輸入通道繫結到既有的Topic
中就可以實現功能的擴充套件,而不需要改變原來已經實現的任何內容。
消費組
雖然Spring Cloud Stream
通過釋出-訂閱模式將訊息生產者與消費者做了很好的解耦,基於相同主題的消費者可以輕鬆的進行擴充套件,但是這些擴充套件都是針對不同的應用例項而言的,在現實的微服務架構中,我們每一個微服務應用為了實現高可用和負載均衡,實際上都會部署多個例項。很多情況下,訊息生產者傳送訊息給某個具體微服務時,只希望被消費一次,雖然有時兩個例項同屬一個應用,但是這個訊息出現了被重複消費兩次的情況。為了解決這個問題,在Spring Cloud Stream
中提供了消費組的概念。
如果在同一個主題上的應用需要啟動多個例項的時候,我們可以通過spring.cloud.stream.bindings.input.group
屬性為應用指定一個組名,這樣這個應用的多個例項在接收到訊息的時候,只會有一個成員真正的收到訊息並進行處理。如下圖所示,我們為Service-A
和Service-B
分別啟動了兩個例項,並且根據服務名進行了分組,這樣當訊息進入主題之後,Group-A
和Group-B
都會收到訊息的副本,但是在兩個組中都只會有一個例項對其進行消費。
預設情況下,當我們沒有為應用指定消費組的時候,Spring Cloud Stream
會為其分配一個獨立的匿名消費組。所以,如果同一主題下所有的應用都沒有指定消費組的時候,當有訊息被髮布之後,所有的應用都會對其進行消費,因為它們各自都屬於一個獨立的組中。大部分情況下,我們在建立Spring Cloud Stream
應用的時候,建議最好為其指定一個消費組,以防止對訊息的重複處理,除非該行為需要這樣做(比如:重新整理所有例項的配置等)。
訊息分割槽
通過引入消費組的概念,我們已經能夠在多例項的情況下,保障每個訊息只被組內一個例項進行消費。通過上面對消費組引數設定後的實驗,我們可以觀察到,消費組並無法控制訊息具體被哪個例項消費。也就是說,對於同一條訊息,它多次到達之後可能是由不同的例項進行消費的。但是對於一些業務場景,就需要對於一些具有相同特徵的訊息每次都可以被同一個消費例項處理,比如:一些用於監控服務,為了統計某段時間內訊息生產者傳送的報告內容,監控服務需要在自身內容聚合這些資料,那麼訊息生產者可以為訊息增加一個固有的特徵ID
來進行分割槽,使得擁有這些ID
的訊息每次都能被髮送到一個特定的例項上實現累計統計的效果,否則這些資料就會分散到各個不同的節點導致監控結果不一致的情況。而分割槽概念的引入就是為了解決這樣的問題:當生產者將訊息資料傳送給多個消費者例項時,保證擁有共同特徵的訊息資料始終是由同一個消費者例項接收和處理。
Spring Cloud Stream
為分割槽提供了通用的抽象實現,用來在訊息中介軟體的上層實現分割槽處理,所以它對於訊息中介軟體自身是否實現了訊息分割槽並不關心,這使得Spring Cloud Stream
為不具備分割槽功能的訊息中介軟體也增加了分割槽功能擴充套件。
訊息型別
Spring Cloud Stream
為了讓開發者能夠在訊息中宣告它的內容型別,在輸出訊息中定義了一個預設的頭資訊:contentType
。對於那些不直接支援頭資訊的訊息中介軟體,Spring Cloud Stream
提供了自己的實現機制,它會在訊息發出前自動將訊息包裝進它自定義的訊息封裝格式中,並加入頭資訊。而對於那些自身就支援頭資訊的訊息中介軟體,Spring Cloud Stream
構建的服務可以接收並處理來自非Spring Cloud Stream
構建但包含符合規範頭資訊的應用程式發出的訊息。
Spring Cloud Stream
允許使用spring.cloud.stream.bindings.<channelName>.cotent-type
屬性以宣告式的配置方式為繫結的輸入和輸出通道設定訊息內容的型別。此外,原生的訊息型別轉換器依然可以輕鬆地用於我們的應用程式。目前,Spring Cloud Stream
中自帶支援了以下幾種常用的訊息型別轉換:
JSON
與POJO
的互相轉換。JSON
與org.springframework.tuple.Tuple
的互相轉換。Object
與byte[]
的互相轉換。為了實現遠端傳輸序列化的原始位元組,應用程式需要傳送byte
型別的資料,或是通過實現Java
的序列化介面來轉換為位元組(Object
物件必須可序列化)。String
與byte[]
的互相轉換。Object
向純文字的轉換:Object
需要實現toString()
方法。
上面所指的JSON
型別可以表現為一個byte
型別的陣列,也可以是一個包含有效JSON
內容的字串。另外,Object
物件可以由JSON
、byte
陣列或者字串轉換而來,但是在轉換為JSON
的時候總是以字串的形式返回。
MIME型別
在Spring Cloud Stream
中定義的content-type
屬性採用了Media Type
,即Internet Media Type
(網際網路媒體型別),也被稱為MIME
型別,常見的有application/json
、text/plain;charset=UTF-8
。
MIME
型別對於標示如何轉換為String
或byte[]
非常有用。並且,我們還可以使用MIME
型別格式來表示Java
型別,只需要使用帶有型別引數的一般型別:application/x-java-object
。
比如,我們可以使用application/x-java-object;type=java.util.Map
來表示傳輸的是一個java.util.Map
物件,或是使用application/x-java-object;type=com.study.springcloud.vo.User
來表示傳輸的是一個com.study.springcloud.vo.User
物件;除此之外,更重要的是,它還提供了自定義的MIME
型別,比如通過application/x-spring-tuple
來指定Spring
的Tuple
型別。
在Spring Cloud Stream
中預設提供了一些可以開箱即用的型別轉換器,具體如下所示:
源內容型別 | 目標內容型別 | content-type頭 | content-type | 註釋 |
---|---|---|---|---|
POJO | JSON String | ignored | application/json | |
Tuple | JSON String | ignored | application/json | JSON is tailored for Tuple |
POJO | String (toString()) | ignored | text/plain,java.lang.String | |
POJO | byte[] (java.io.serialized) | ignored | application/x-java-serialized-object | |
JSON byte[] 或 String | POJO | application/json(or none) | application/x-java-object | |
byte[] 或 String | Serializable | application/x-java-serialized-object | application/x-java-object | |
JSON byte[] 或 String | Tuple | application/json(or none) | application/x-spring-tuple | |
byte[] | String | any | text/plain,java.lang.String | will apply any Charset specified in the content-type header |
String | byte[] | any | application/octet-stream | will apply any Charset specified in the content-type header |
訊息型別的轉換行為只會在需要進行轉換時才被執行,比如,當服務模組產生了一個頭資訊為application/json
的XML
字串訊息,Spring Cloud Stream
是不會將該XML
字串轉換為JSON
的,這是因為該模組的輸出內容已經是一個字串型別了,所以它並不會將其做進一步的轉換。
另外需要注意的是,Spring Cloud Stream
雖然同時支援輸入通道和輸出通道的訊息型別轉換,但還是推薦開發者儘量在輸出通道中做訊息轉換。因為對於輸入通道的消費者來說,當目標是一個POJO
的時候,使用@StreamListener
註解使能夠支援自動對其進行轉換的。
Spring Cloud Stream
除了提供上面這些開箱即用的轉換器之外,還支援開發者自定義的訊息轉換器。這使得我們可以使用任意格式(包括二進位制)的資料進行傳送和接收,並且將這些資料與特定的contentType
相關聯。在應用啟用的時候,Spring Cloud Stream
會將所有org.springframework.messaging.converter.MessageConverter
介面實現的自定義轉換器以及預設實現的那些轉換器都載入到訊息轉換工廠中,以提供給訊息處理時使用。