SpringCloud實戰9-Stream訊息驅動
官方定義 Spring Cloud Stream 是一個構建訊息驅動微服務的框架。
應用程式通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 互動,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與訊息中介軟體互動。所以,我們只需要搞清楚如何與 Spring Cloud Stream 互動就可以方便使用訊息驅動的方式。
通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動。Spring Cloud Stream 為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用了釋出-訂閱、消費組、分割槽的三個核心概念。目前僅支援RabbitMQ、Kafka。
這裡還要講解一下什麼是Spring Integration ? Integration 整合
企業應用整合(EAI)是整合應用之間資料和服務的一種應用技術。四種整合風格:
1.檔案傳輸:兩個系統生成檔案,檔案的有效負載就是由另一個系統處理的訊息。該類風格的例子之一是針對檔案輪詢目錄或FTP目錄,並處理該檔案。
2.共享資料庫:兩個系統查詢同一個資料庫以獲取要傳遞的資料。一個例子是你部署了兩個EAR應用,它們的實體類(JPA、Hibernate等)共用同一個表。
3.遠端過程呼叫:兩個系統都暴露另一個能呼叫的服務。該類例子有EJB服務,或SOAP和REST服務。
4.訊息:兩個系統連線到一個公用的訊息系統,互相交換資料,並利用訊息呼叫行為。該風格的例子就是眾所周知的中心輻射式的(hub-and-spoke)JMS架構。
為什麼需要SpringCloud Stream訊息驅動呢?
比方說我們用到了RabbitMQ和Kafka,由於這兩個訊息中介軟體的架構上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分割槽,這些中介軟體的差異性導致我們實際專案開發給我們造成了一定的困擾,我們如果用了兩個訊息佇列的其中一種,
後面的業務需求,我想往另外一種訊息佇列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。
如下是官方文件提供的架構圖所示:
Spring Cloud Stream由一箇中間件中立的核組成。應用通過Spring Cloud Stream插入的input(相當於消費者consumer,它是從佇列中接收訊息的)和output(相當於生產者producer,它是從佇列中傳送訊息的。)通道與外界交流。
通道通過指定中介軟體的Binder實現與外部代理連線。業務開發者不再關注具體訊息中介軟體,只需關注Binder對應用程式提供的抽象概念來使用訊息中介軟體實現業務即可。
Binder
通過定義繫結器作為中間層,實現了應用程式與訊息中介軟體(Middleware)細節之間的隔離。通過嚮應用程式暴露統一的Channel通過,使得應用程式不需要再考慮各種不同的訊息中介軟體的實現。當需要升級訊息中介軟體,或者是更換其他訊息中介軟體產品時,我們需要做的就是更換對應的Binder繫結器而不需要修改任何應用邏輯 。甚至可以任意的改變中介軟體的型別而不需要修改一行程式碼。目前只提供了RabbitMQ和Kafka的Binder實現。
Springcloud Stream還有個好處就是像Kafka一樣引入了一點分割槽的概念,像RabbitMQ不支援分割槽的佇列,你用了SpringCloud Stream技術,它就會幫RabbitMQ引入了分割槽的特性,SpringCloud Stream就是天然支援分割槽的,我們用起來還是很方便的。後面會詳細講解
接下來進行一個Demo進行演練。
首先我們要在先前的工程中新建三個子模組,分別是springcloud-stream,springcloud-stream1,springcloud-stream2 這三個模組,其中springcloud-stream作為生產者進行發訊息模組,springcloud-stream1,springcloud-stream2作為訊息接收模組。
如下圖所示:
分別在springcloud-stream,springcloud-stream1,springcloud-stream2 這三個模組引入如下依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
接著進行application.yml進行配置如下:
server:
port: 7888
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
#Kafka的訊息中介軟體伺服器
brockers: localhost:9092
#Zookeeper的節點,如果叢集,後面加,號分隔
zk-nodes: localhost:2181
#如果設定為false,就不會自動建立Topic 有可能你Topic還沒建立就直接呼叫了。
auto-create-topics: true
bindings:
#這裡用stream給我們提供的預設output,後面會講到自定義output
output:
#訊息發往的目的地
destination: stream-demo
#訊息傳送的格式,接收端不用指定格式,但是傳送端要
content-type: text/plain
接下來進行第一個springcloud-stream模組的程式碼編寫,在該模組下定義一個SendService,如下:
package hjc.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
/**
* Created by cong on 2018/5/28.
*/
//這個註解給我們繫結訊息通道的,Source是Stream給我們提供的,可以點進去看原始碼,可以看到output和input,這和配置檔案中的output,input對應的。
@EnableBinding(Source.class)
public class SendService {
@Autowired
private Source source;
public void sendMsg(String msg){
source.output().send(MessageBuilder.withPayload(msg).build());
}
}
springcloud-stream 的controller層程式碼如下:
package hjc.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Created by cong 2018/5/28
*/
@RestController
public class ProducerController {
@Autowired
private SendService sendService;
@RequestMapping("/send/{msg}")
public void send(@PathVariable("msg") String msg){
sendService.sendMsg(msg);
}
}
接下來進行springcloud-stream1,springcloud-stream2兩個模組的程式碼編寫
首先需要引入的依賴,上面已經提到。
接著進行springcloud-stream1和springcloud-stream2模組application.yml的配置,如下:
springcloud-stream1配置如下:
server:
port: 7889
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brockers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
#input是接收,注意這裡不能再像前面一樣寫output了
input:
destination: stream-demo
springcloud-stream2模組application.yml的配置如下:
server:
port: 7890
spring:
application:
name: consumer_2
cloud:
stream:
kafka:
binder:
brockers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
input:
destination: stream-demo
好了接下來進行springcloud-stream1模組和springcloud-stream2模組的訊息接受程式碼的編寫,springcloud-stream1模組和springcloud-stream2模組的訊息接受程式碼都是一樣的,如下:
//訊息接受端,stream給我們提供了Sink,Sink原始碼裡面是繫結input的,要跟我們配置檔案的imput關聯的。
@EnableBinding(Sink.class)
public class RecieveService {
@StreamListener(Sink.INPUT)
public void recieve(Object payload){
System.out.println(payload);
}
}
好了接著我們首先要啟動上一篇隨筆所提到的zookeeper,和Kafka,如下:
接著分別現後啟動啟動springcloud-stream,springcloud-stream1,springcloud-stream2,模組執行結果如下:
首先進行springcloud-stream模組的訪問,如下:
回車後可以看到,Kafka CommitId,說明訊息傳送成功,再看一下,那兩個訊息接受模組的輸出,如下:
可以看到這兩訊息模組都接收到了訊息並且列印了出來。
好了到現在為止,我們進行了一個簡單的訊息傳送和接收,用的是Stream給我們提供的預設Source,Sink,接下來我們要自己進行自定義,這種方式在工作中還是用的比較多的,因為我們要往不同的訊息通道發訊息,
必然不能全都叫input,output的,那樣的話就亂套了,因此首先自定義一個介面,如下:
/**
* Created by cong on 2018/5/28.
*/
public interface MySource {
@Output("myOutput")
MessageChannel myOutput();
}
這裡要注意一下,可以看到上面的程式碼,其中myOutput是要和你的配置檔案的訊息傳送端配置對應的,因此修改springcloud-stream中application.yml配置,如下:
server:
port: 7888
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
#Kafka的訊息中介軟體伺服器
brockers: localhost:9092
#Zookeeper的節點,如果叢集,後面加,號分隔
zk-nodes: localhost:2181
#如果設定為false,就不會自動建立Topic 有可能你Topic還沒建立就直接呼叫了。
auto-create-topics: true
bindings:
#自定義output
myOutput:
#訊息發往的目的地
destination: stream-demo
#訊息傳送的格式,接收端不用指定格式,但是傳送端要
content-type: text/plain
這樣還不行,還必須改造springcloud-stream訊息傳送端的SendService這個類,程式碼如下:
package hjc.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
/**
* Created by cong on 2018/5/28.
*/
@EnableBinding(MySource.class)
public class SendService {
@Autowired
private MySource source;
public void sendMsg(String msg){
source.myOutput().send(MessageBuilder.withPayload(msg).build());
}
}
接下來重新啟動那三個模組,執行結果如下:
可以看到兩個訊息接收端還是依然能接受訊息。
接收端的自定義接收也是類似的修改的,這裡就不演示了。
springcloud-stream還給我們提供了一個Processor介面,用於進行訊息處理後再進行傳送出去,相當於一個訊息中轉站。下面我們進行演示
首先我們需要改造springcloud-stream1模組,把它作為一個訊息中轉站。用於springcloud-stream1訊息處理後再進行傳送給springcloud-stream2模組
首先修改springcloud-stream1模組的配置,如下:
server:
port: 7889
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brockers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
#input是接收,注意這裡不能再像前面一樣寫output了
input:
destination: stream-demo
#進行訊息中轉處理後,在進行轉發出去
output:
destination: stream-demo-trans
接著在新建一個訊息中轉類,程式碼如下:
package hjc.consumer;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.ServiceActivator;
/**
* Created by cong on 2018/5/28.
*/
@EnableBinding(Processor.class)
public class TransFormService {
@ServiceActivator(inputChannel = Processor.INPUT,outputChannel = Processor.OUTPUT)
public Object transform(Object payload){
System.out.println("訊息中轉站:"+payload);
return payload;
}
}
接著要修改訊息中轉站傳送訊息出去的接收端springcloud-stream2的配置,如下:
server:
port: 7890
spring:
application:
name: consumer_2
cloud:
stream:
kafka:
binder:
brockers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
input:
destination: stream-demo-trans
這裡要強調一下,要把先前RecieveService類的繫結註解全都註釋掉,不然,會繫結衝突的,接下來分別重啟這三個模組,執行結果如下: 先進性springcloud-stream模組的訪問。
中轉站執行結果取下:
接下來,看中轉後的的接受端Springcloud-stream2的訊息,到底有沒有訊息過來,如下:
可以看到,中轉後訊息被接受到了。
我們還可能會遇到一個場景就是,我們接收到訊息後,給別人一個反饋ACK,SpringCloud stream 給我們提供了一個SendTo註解可以幫我們幹這些事情。
首先我們先實現一個介面SendToBinder去實現output和input,程式碼如下:
package hjc.consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/**
* Created by cong on 2018/5/28.
*/
public interface SendToBinder {
@Output("output")
MessageChannel output();
@Input("input")
SubscribableChannel input();
}
接著再新建一個SendToService類來繫結自己的SendToBinder介面,然後監聽input,返回ACK表示中轉站收到訊息了,再轉發訊息出去,程式碼如下:
package hjc.consumer;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* Created by cong on 2018/5/28.
*/
@EnableBinding(SendToBinder.class)
public class SendToService {
@StreamListener("input")
@SendTo("output")
public Object receiveFromInput(Object payload){
System.out.println("中轉訊息。。"+payload);
return "xxxxx";
}
}
這裡要注意一點就是,啟動前下那邊之前的用到的哪些繫結註解,先註釋掉,不然與這裡會發生衝突。
執行結果如下:
可以看到傳送端受到一個ACK
可以看到先前的例子,我們都是一端發訊息,兩個訊息接受者都接收到了,但是有時候有些業務場景我只想讓其中一個訊息接收者接收到訊息,那麼該怎麼辦呢?
這時候就涉及一個訊息分組(Consumer Groups)的概念了。
訊息分組(Consumer Groups)
“Group”,如果使用過 Kafka 的讀者並不會陌生。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一致。微服務中動態的縮放同一個應用的數量以此來達到更高的處理能力是非常必須的。對於這種情況,同一個事件防止被重複消費,
只要把這些應用放置於同一個 “group” 中,就能夠保證訊息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關係,只有其中一個可以消費。
首先修改該springcloud-stream1模組的配置,修改程式碼如下:
server:
port: 7889
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brockers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
#input是接收,注意這裡不能再像前面一樣寫output了
input:
destination: stream-demo
#分組的組名
group: group
接著修改springcloud-stream2模組的配置,程式碼如下:
server:
port: 7890
spring:
application:
name: consumer_2
cloud:
stream:
kafka:
binder:
brockers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
input:
destination: stream-demo-trans
group: group
可以看到springcloud-stream1和springcloud-stream2是屬於同一組的。springcloud-stream模組的發的訊息只能被springcloud-stream1或springcloud-stream2其中一個接收到,這樣避免了重複消費。
springcloud-stream1模組程式碼恢復成如下程式碼:
package hjc.consumer;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* Created by cong on 2018/5/28.
*/
//訊息接受端,stream給我們提供了Sink,Sink原始碼裡面是繫結input的,要跟我們配置檔案的imput關聯的。
@EnableBinding(Sink.class)
public class RecieveService {
@StreamListener(Sink.INPUT)
public void recieve(Object payload){
System.out.println(payload);
}
}
springcloud-stream2的接收端程式碼不變,依然跟上面程式碼一樣。
接著,執行結果如下:
控制檯如下:
可以看到只有其中一個受到訊息。避免了訊息重複消費。
有時候我們只想給特定的消費者消費訊息,那麼又該真麼做呢?
這是後又涉及到訊息分割槽的概念了。
訊息分割槽()
Spring Cloud Stream對給定應用的多個例項之間分隔資料予以支援。在分隔方案中,物理交流媒介(如:代理主題)被視為分隔成了多個片(partitions)。一個或者多個生產者應用例項給多個消費者應用例項傳送訊息並確保相同特徵的資料被同一消費者例項處理。
Spring Cloud Stream對分割的程序例項實現進行了抽象。使得Spring Cloud Stream 為不具備分割槽功能的訊息中介軟體(RabbitMQ)也增加了分割槽功能擴充套件。
那麼我們就要進行一些配置了,比如我只想要springcloud-stream2模組接收到訊息,
springcloud-stream2配置如下:
server:
port: 7890
spring:
application:
name: consumer_2
cloud:
stream:
kafka:
binder:
brockers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
input:
destination: stream-demo-trans
group: group
consumer:
#開啟分割槽
partitioned: true
#分割槽數量
instance-count: 2
生產者端springcloud-stream模組配置如下:
server:
port: 7888
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
#Kafka的訊息中介軟體伺服器
brockers: localhost:9092
#Zookeeper的節點,如果叢集,後面加,號分隔
zk-nodes: localhost:2181
#如果設定為false,就不會自動建立Topic 有可能你Topic還沒建立就直接呼叫了。
auto-create-topics: true
bindings:
#自定義output
myOutput:
#訊息發往的目的地
destination: stream-demo
#訊息傳送的格式,接收端不用指定格式,但是傳送端要
content-type: text/plain
producer:
#分割槽的主鍵,根據什麼來分割槽,下面的payload.id只是一個物件的id用於做為Key,用來說明的。希望不要誤解
partitionKeyExpression: payload.id
#Key和分割槽數量進行取模去分配訊息,這裡分割槽數量配置為2
partitionCount: 2
其他的程式碼基本不變,這裡就不演示了。這裡要給大家說明一下,比如分割槽的Key是一個物件的id,比如說id=1,每次傳送訊息的物件的id為相同值1,則訊息只會被同一個消費者消費,比如說Key和分割槽數量取模計算的結果是分到stream2模組中,那麼下一次進行進行訊息傳送,
只要分組的key即id的值依然還是1的話,訊息永遠只會分配到stream2模組中。