SpringCloud學習筆記(五)-SpringCloudStream整合kafka
Spring Cloud Stream是構建訊息驅動的微服務應用程式框架。提供統一的接收發送管道以連線到訊息代理。通過@EnableBinding註解開啟SpringCloudStream的支援。通過@StreamListener註解,使其接收流處理的時間。
SpringCloudStream應用模型
一、引入依賴包
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
二、自定義資訊通道
官方提供了Sink(輸入通道)、Source(輸出通道)、Processor(整合Sink和Source通道),我們也可以自定義我們自己的資訊通道。
@Input註解標識一個輸入通道
@Output註解標識一個輸出通道
通道名稱作為引數,如果未提供引數,預設使用方法名稱作為通道名稱。
如下我們自定義資訊通道EsChannel
/** * 自定義資訊通道 * @author dbq * @date 2019/9/26 14:54 */ public interface EsChannel { /** * 預設傳送訊息通道名稱 */ String ES_DEFAULT_OUTPUT = "es_default_output"; /** * 預設接收訊息通道名稱 */ String ES_DEFAULT_INPUT = "es_default_input"; /** * 告警傳送訊息通道名稱 */ String ES_ALARM_OUTPUT = "es_alarm_output"; /** * 告警接收訊息通道名稱 */ String ES_ALARM_INPUT = "es_alarm_input"; /** * 預設傳送訊息通道 * @return channel 返回預設資訊傳送通道 */ @Output(ES_DEFAULT_OUTPUT) MessageChannel sendEsDefaultMessage(); /** * 告警傳送訊息通道 * @return channel 返回告警資訊傳送通道 */ @Output(ES_ALARM_OUTPUT) MessageChannel sendEsAlarmMessage(); /** * 預設接收訊息通道 * @return channel 返回預設資訊接收通道 */ @Input(ES_DEFAULT_INPUT) MessageChannel recieveEsDefaultMessage(); /** * 告警接收訊息通道 * @return channel 返回告警資訊接收通道 */ @Input(ES_ALARM_INPUT) MessageChannel recieveEsAlarmMessage(); }
三、@EnableBinding使應用程式連線到訊息代理
@EnableDiscoveryClient @SpringBootApplication @EnableFeignClients @EnableHystrix @MapperScan(basePackages = "com.es.mapper") @EnableBinding(EsChannel.class) public class EsOnenetApplication { public static void main(String[] args) { SpringApplication.run(EsOnenetApplication.class, args); } }
四、SpringCloudStream及kafka配置
#==============================================================
#spring-cloud-stream-Kafka配置 開始
#==============================================================
#是否開啟kafka(非spring-cloud-stream配置)
spring.kafka.enabled=false
#預設的輸入、輸出通道
spring.cloud.stream.bindings.es_default_input.destination=es_default_topic
spring.cloud.stream.bindings.es_default_input.binder=kafka
spring.cloud.stream.bindings.es_default_input.group=es_default_group
spring.cloud.stream.bindings.es_default_output.destination=es_default_topic
spring.cloud.stream.bindings.es_default_output.binder=kafka
#入站消費者的併發性
spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2
#告警的輸入、輸出通道(多主題、分組測試用,實際開發中根據業務需求定義)
spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_input.binder=kafka
spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group
spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_output.binder=kafka
#kafka配置
spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092
spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181
spring.cloud.stream.kafka.binder.requiredAcks=1
#==============================================================
#spring-cloud-stream-Kafka配置 結束
#==============================================================
從上面配置可以看出
1、定義了通道名稱及分組,binder代表繫結實現的標識名稱(如kafka或者rabbit),與3中的定義名稱相對應。
2、定義了入站消費者的併發性,指在一個例項內的併發性,不同例項之間本身就是併發的,預設值為1
spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
3、定義了kafka連線資訊
如果未配置autoCommitOffset,預設自動提交偏移量
詳細引數配置可參考https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_apache_kafka_binder
五、傳送訊息到輸出通道
/**
* kafka訊息傳送器
* @author dbq
* @date 2019/9/26 17:50
*/
@Component
public class EsKafkaMessageSender {
@Autowired
private EsChannel channel;
/**
* 訊息傳送到預設通道:預設通道對應預設主題
* @param message
*/
public void sendToDefaultChannel(String message){
channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
}
/**
* 訊息傳送到告警通道:告警通道對應告警主題
* @param message
*/
public void sendToAlarmChannel(String message){
channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
}
}
注入先前定義的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分別為我們自定義的兩個傳送方法,可將訊息傳送到不同的通道中,每個通道對應一個kafka的主題。
六、從輸入通道訂閱訊息
@EnableBinding(value = EsChannel.class)
public class EsStreamListener {
/**
* 從預設通道接收訊息
* @param message
*/
@StreamListener(EsChannel.ES_DEFAULT_INPUT)
public void receive(Message<String> message){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println(sdf.format(new Date())+"------start--------安全用電預設訊息:" + message);
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date())+"------end--------安全用電預設訊息");
}
/**
* 從告警通道接收訊息
* @param message
*/
@StreamListener(EsChannel.ES_ALARM_INPUT)
public void receiveAlarm(Message<String> message){
System.out.println("訂閱告警訊息:" + message);
}
}
從不同的通道實現訊息的訂閱。
七、這樣完整的訊息系統就搭建好了,定義Controller傳送訊息測試
@ApiOperation(value = "test1", httpMethod = "POST")
@PostMapping(value = "/test1", produces = "application/json;charset=UTF-8")
public void test1(String message, HttpServletRequest request,
HttpServletResponse response) {
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
}
@ApiOperation(value = "test", httpMethod = "POST")
@PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
public void test2(String message, HttpServletRequest request,
HttpServletResponse response) {
sender.sendToAlarmChannel(message);
}
test1:傳送訊息的預設訊息通道
test2:傳送訊息到告警訊息通道
八、併發性測試
如七中所示,一次傳送4條訊息到預設訊息通道中,並啟動兩個例項(即兩個微服務組成一個小型叢集),在併發性配置為1的情況下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1
例項1
2019-09-30 11:13:14------start--------預設訊息...
2019-09-30 11:13:24------end--------預設訊息
例項2
2019-09-30 11:13:14------start--------預設訊息:...
2019-09-30 11:13:24------end--------預設訊息
2019-09-30 11:13:24------start--------預設訊息:...
2019-09-30 11:13:34------end--------預設訊息
2019-09-30 11:13:34------start--------預設訊息:...
2019-09-30 11:13:44------end--------預設訊息
通過列印日誌(日誌做了簡化處理)可以看出,兩個例項之間是做到了併發消費,但是在1個例項內部,並沒有併發消費。
如果將concurrency修改為2.
日誌如下
例項1
2019-09-30 11:31:13------start--------:...
2019-09-30 11:31:13------start--------預設訊息:...
2019-09-30 11:31:23------end--------預設訊息
2019-09-30 11:31:23------end--------預設訊息
2019-09-30 11:31:23------start--------預設訊息:...
2019-09-30 11:31:33------end--------預設訊息
例項2
2019-09-30 11:31:13------start--------預設訊息:...
2019-09-30 11:31:23------end--------
從日誌可以看出,例項1中實現了兩個執行緒的併發消費。