1. 程式人生 > 實用技巧 >SpringCloudStream整合kafka

SpringCloudStream整合kafka

原文連結:https://www.jianshu.com/p/a94c67f02c16

Spring Cloud Stream是構建訊息驅動的微服務應用程式框架。提供統一的接收發送管道以連線到訊息代理。通過@EnableBinding註解開啟SpringCloudStream的支援。通過@StreamListener註解,使其接收流處理的時間。


SpringCloudStream應用模型

一、引入依賴包

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

二、自定義資訊通道

官方提供了Sink(輸入通道)、Source(輸出通道)、Processor(整合Sink和Source通道),我們也可以自定義我們自己的資訊通道。
@Input註解標識一個輸入通道
@Output註解標識一個輸出通道
通道名稱作為引數,如果未提供引數,預設使用方法名稱作為通道名稱。
如下我們自定義資訊通道EsChannel

/**
 * 自定義資訊通道
 * @author dbq
 * @date 2019/9/26 14:54
 */
public interface EsChannel {
    /**
     * 預設傳送訊息通道名稱
     */
    String ES_DEFAULT_OUTPUT = "es_default_output";

    /**
     * 預設接收訊息通道名稱
     */
    String ES_DEFAULT_INPUT = "es_default_input";

    /**
     * 告警傳送訊息通道名稱
     */
    String ES_ALARM_OUTPUT = "es_alarm_output";

    /**
     * 告警接收訊息通道名稱
     */
    String ES_ALARM_INPUT = "es_alarm_input";

    /**
     * 預設傳送訊息通道
     * @return channel 返回預設資訊傳送通道
     */
    

三、@EnableBinding使應用程式連線到訊息代理

@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
@EnableHystrix
@MapperScan(basePackages = "com.es.mapper")
@EnableBinding(EsChannel.class)
public class EsOnenetApplication {

    public static void main(String[] args) {
        SpringApplication.run(EsOnenetApplication.class, args);
    }
}

四、SpringCloudStream及kafka配置

#==============================================================
#spring-cloud-stream-Kafka配置 開始
#==============================================================
#是否開啟kafka(非spring-cloud-stream配置)
spring.kafka.enabled=false
#預設的輸入、輸出通道
spring.cloud.stream.bindings.es_default_input.destination=es_default_topic
spring.cloud.stream.bindings.es_default_input.binder=kafka
spring.cloud.stream.bindings.es_default_input.group=es_default_group

spring.cloud.stream.bindings.es_default_output.destination=es_default_topic
spring.cloud.stream.bindings.es_default_output.binder=kafka

#入站消費者的併發性
spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2

#告警的輸入、輸出通道(多主題、分組測試用,實際開發中根據業務需求定義)
spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_input.binder=kafka
spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group

spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_output.binder=kafka

#kafka配置
spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092
spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181
spring.cloud.stream.kafka.binder.requiredAcks=1
#==============================================================
#spring-cloud-stream-Kafka配置 結束
#==============================================================

從上面配置可以看出
1、定義了通道名稱及分組,binder代表繫結實現的標識名稱(如kafka或者rabbit),與3中的定義名稱相對應。
2、定義了入站消費者的併發性,指在一個例項內的併發性,不同例項之間本身就是併發的,預設值為1
spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
3、定義了kafka連線資訊
如果未配置autoCommitOffset,預設自動提交偏移量
詳細引數配置可參考官網

五、傳送訊息到輸出通道

/**
 * kafka訊息傳送器
 * @author dbq
 * @date 2019/9/26 17:50
 */
@Component
public class EsKafkaMessageSender {
    @Autowired
    private EsChannel channel;

    /**
     * 訊息傳送到預設通道:預設通道對應預設主題
     * @param message
     */
    public void sendToDefaultChannel(String message){
        channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 訊息傳送到告警通道:告警通道對應告警主題
     * @param message
     */
    public void sendToAlarmChannel(String message){
        channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }
}

注入先前定義的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分別為我們自定義的兩個傳送方法,可將訊息傳送到不同的通道中,每個通道對應一個kafka的主題。

六、從輸入通道訂閱訊息

@EnableBinding(value = EsChannel.class)
public class EsStreamListener {

    /**
     * 從預設通道接收訊息
     * @param message
     */
    @StreamListener(EsChannel.ES_DEFAULT_INPUT)
    public void receive(Message<String> message){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        System.out.println(sdf.format(new Date())+"------start--------安全用電預設訊息:" + message);
        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date())+"------end--------安全用電預設訊息");
    }

    /**
     * 從告警通道接收訊息
     * @param message
     */
    @StreamListener(EsChannel.ES_ALARM_INPUT)
    public void receiveAlarm(Message<String> message){
        System.out.println("訂閱告警訊息:" + message);
    }
}

從不同的通道實現訊息的訂閱。

七、這樣完整的訊息系統就搭建好了,定義Controller傳送訊息測試

@ApiOperation(value = "test1", httpMethod = "POST")
    @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8")
    public void test1(String message, HttpServletRequest request,
                             HttpServletResponse response) {
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }

    @ApiOperation(value = "test", httpMethod = "POST")
    @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
    public void test2(String message, HttpServletRequest request,
                      HttpServletResponse response) {
        sender.sendToAlarmChannel(message);
    }

test1:傳送訊息的預設訊息通道
test2:傳送訊息到告警訊息通道

八、併發性測試

如七中所示,一次傳送4條訊息到預設訊息通道中,並啟動兩個例項(即兩個微服務組成一個小型叢集),在併發性配置為1的情況下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1

例項1

2019-09-30 11:13:14------start--------預設訊息...
2019-09-30 11:13:24------end--------預設訊息

例項2

2019-09-30 11:13:14------start--------預設訊息:...
2019-09-30 11:13:24------end--------預設訊息

2019-09-30 11:13:24------start--------預設訊息:...
2019-09-30 11:13:34------end--------預設訊息

2019-09-30 11:13:34------start--------預設訊息:...
2019-09-30 11:13:44------end--------預設訊息

通過列印日誌(日誌做了簡化處理)可以看出,兩個例項之間是做到了併發消費,但是在1個例項內部,並沒有併發消費。
如果將concurrency修改為2.
日誌如下
例項1

2019-09-30 11:31:13------start--------:...
2019-09-30 11:31:13------start--------預設訊息:...
2019-09-30 11:31:23------end--------預設訊息
2019-09-30 11:31:23------end--------預設訊息
2019-09-30 11:31:23------start--------預設訊息:...
2019-09-30 11:31:33------end--------預設訊息

例項2

2019-09-30 11:31:13------start--------預設訊息:...
2019-09-30 11:31:23------end--------

從日誌可以看出,例項1中實現了兩個執行緒的併發消費。