1. 程式人生 > >SpringCloudBus使用Kafka實現訊息匯流排

SpringCloudBus使用Kafka實現訊息匯流排

Kafka是分散式釋出-訂閱訊息系統,最初由LinkedIn公司開發,之後成為之後成為Apache基金會的一部分,由ScalaJava編寫。Kafka是一種快速、可擴充套件的、設計內在就是分散式的,分割槽的和可複製的提交日誌服務。

在開始本文前,需要搭建kafka的環境,如果是在CentOS環境下,可以看看我前面的文章:CentOS7下Kafka的安裝介紹 。其他平臺下可以自行百度或Google。

在之前的環境中,需要修改server.properties檔案,開啟9092埠的監聽:

listeners=PLAINTEXT://your.host.name:9092

SpringBoot簡單整合Kafka

因為SpringCloud是基於SpringBoot的,所以在使用SpringCloudBus整合之前先用SpringBoot整合並記錄下來。

建立專案

這裡建立一個名為kafka-hello的SpringBoot專案,並新增以下依賴:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency
>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId
>
<scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies>

訊息實體類

@Data
public class Message {
    private Long id;//id
    private String msg; //訊息
    private Date sendTime; //傳送時間
}

訊息產生者

在該類中建立一個訊息傳送的方法,使用KafkaTemplate.send()傳送訊息,wqh是Kafka裡的Topic。

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    public void send(Long i){
        Message message = new Message();
        message.setId(i);
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("========傳送訊息  "+i+" >>>>{}<<<<<==========",gson.toJson(message));
        kafkaTemplate.send("wqh",gson.toJson(message));
    }
}

訊息接收類,

在這個類中,建立consumer方法,並使用@KafkaListener註解監聽指定的topic,如這裡是監聽wanqh和wqh兩個topic。

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = {"wanqh","wqh"})
    public void consumer(ConsumerRecord<?,?> consumerRecord){
        //判斷是否為null
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        log.info(">>>>>>>>>> record =" + kafkaMessage);
        if(kafkaMessage.isPresent()){
            //得到Optional例項中的值
            Object message = kafkaMessage.get();
            log.info(">>>>>>>>接收訊息message =" + message);
        }
    }
}

修改啟動類

@SpringBootApplication
public class KafkaApplication {

    @Autowired
    private KafkaSender kafkaSender;

    @PostConstruct
    public void init(){
      for (int i = 0; i < 10; i++) {
        //呼叫訊息傳送類中的訊息傳送方法
        kafkaSender.send((long) i);
      }
    }
    public static void main(String[] args) {
       SpringApplication.run(KafkaApplication.class, args);
    }
}

配置檔案

spring.application.name=kafka-hello
server.port=8080
#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring.kafka.bootstrap-servers=192.168.18.136:9092

#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量傳送訊息的數量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定訊息key和訊息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定預設消費者group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定訊息key和訊息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

測試

直接啟動該專案:

SpringCloudBus整合Kafka

前面介紹使用RabbitMQ整合SpringCloudBus實現了訊息匯流排,並且測試了動態重新整理配置檔案。RabbitMQ是通過引入spring-cloud-starter-bus-amqp模組來實現訊息匯流排。若使用Kafka實現訊息匯流排,我們可以直接將之前新增的spring-cloud-starter-bus-amqp替換成spring-cloud-starter-bus-kafka

這裡我將前面的config-client複製一份,改名config-client-kafka。傳送門:SpingCloudBus整合RabbitMQ

  • 所新增的依賴:
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency>
    </dependencies>
  • 新增kafka的配置資訊
#Kafka的服務端列表,預設localhost
spring.cloud.stream.kafka.binder.brokers=192.168.18.136:9092
#Kafka服務端的預設埠,當brokers屬性中沒有配置埠資訊時,就會使用這個預設埠,預設9092
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
#Kafka服務端連線的ZooKeeper節點列表,預設localhost
spring.cloud.stream.kafka.binder.zkNodes=192.168.18.136:2181
#ZooKeeper節點的預設埠,當zkNodes屬性中沒有配置埠資訊時,就會使用這個預設埠,預設2181
spring.cloud.stream.kafka.binder.defaultZkPort=2181

測試方法與前一篇一樣,不介紹了。

參考:

專案地址:

原文[地址: