1. 程式人生 > 實用技巧 >Springboot2整合kafka的兩種使用方式

Springboot2整合kafka的兩種使用方式

Springboot2整合kafka

kafka是一個分散式訊息佇列。在專案中應用十分廣泛,具有高效能、持久化、多副本備份、橫向擴充套件能力。


kafka

  • 在多臺機器上分別部署Kafka,即Kafka叢集。每臺機器執行的Kafka服務稱為broker。
  • 一個Topic主題可以被分為若干個分割槽(partition),每個分割槽在儲存層面是append log檔案。
  • 分割槽(Partition )為Kafka提供了可伸縮性,水平擴充套件功能。
  • 多副本機制(Partition Replica)提高了kafka的資料可靠性和容災能力。

    圖片來源《深入理解kafka核心設計和實踐原理》

docker上安裝環境

1.安裝zookeeper 和 安裝kafka

  • 這裡使用了wurstmeister/kafka和wurstmeister/zookeeper這兩個版本的映象

2.執行映象

  • 整個啟動過程遇到了8個左右報錯,一個個解決,最後執行成功,簡單列幾個
    • Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME
    • WARN Session 0x0 for server zookeeper:2181, unexpected error, closing socket
    • java.nio.channels.UnresolvedAddressException
    • could not be established. Broker may not be available
    • Give up sending metadata request since no node is available
  • 總結下最後的啟動命令,依此啟動zookeeper和kafka
docker run --name zk01 -p 2181:2181 --restart always -d zookeeper
  • 1
docker run --name kafka01 -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e  KAFKA_ADVERTISED_HOST_NAME=localhost -e KAFKA_ZOOKEEPER_CONNECT="192.168.0.111:2181" -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.111:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zk01:zk -t wurstmeister/kafka
  • 1


Springboot2引入kafka

 <!--引入Kafka-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency> 
  • 1
  • 2
  • 3
  • 4
  • 5
  • application.properties配置

#kafka配置
spring.kafka.bootstrap-servers=192.168.0.111:9092
#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量傳送訊息的數量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定訊息key和訊息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer  =======================
# 指定預設消費者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定訊息key和訊息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

基於註解

生產

@Component
public class KafkaProducer {

    private static final String MY_TOPIC = "TOPIC_LIN_LIANG";

    @Autowired
    KafkaTemplate kafkaTemplate;

    public void produce(){
        Message message = new Message();
        message.setId(12L);
        message.setMsg("hello jack");
        message.setTime(new Date());
        kafkaTemplate.send(MY_TOPIC,message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

消費
@KafkaListener可以接受的引數有:

  • data : 對於data值的型別其實並沒有限定,根據KafkaTemplate所定義的型別來決定。 data為List集合的則是用作批量消費。
  • ConsumerRecord:具體消費資料類,包含Headers資訊、分割槽資訊、時間戳等
  • Acknowledgment:用作Ack機制的介面
  • Consumer:消費者類,使用該類我們可以手動提交偏移量、控制消費速率等功能
   public void listen1(String data) 

    public void listen2(ConsumerRecord<K,V> data) 

    public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) 

    public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) 

    public void listen5(List<String> data) 

    public void listen6(List<ConsumerRecord<K,V>> data) 

    public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) 

    public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

使用示例

   @KafkaListener(topics = {MY_TOPIC})
    public void consume(String message){
        log.info("receive msg "+ message);
    }
  • 1
  • 2
  • 3
  • 4

基於客戶端

0.9x版本後的kafka客戶端使用java語言編寫,本人更傾向於這種開發方式。
在配置中註釋了基本意思,具體參考了朱忠華的《深入理解kafka:核心設計和實現原理》,學kafka感覺這一本就夠了。

/**
 * linliang
 */
@Configuration
public class Kafka_Config implements InitializingBean {

    @Value("${kafka.broker.list}")
    public String brokerList;

    public static final String topic = "TOPIC_LIN_LIANG";

    public final String groupId = "group.01";

    public Properties customerConfigs() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動位移提交
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);//自動位移提交間隔時間
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);//消費組失效超時時間
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//位移丟失和位移越界後的恢復起始位置
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        return props;
    }

    public Properties producerConfigs() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 20000000);//20M 訊息快取
        //生產者空間不足時,send()被阻塞的時間,預設60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        //生產者重試次數
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //指定ProducerBatch(訊息累加器中BufferPool中的)可複用大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //生產者會在ProducerBatch被填滿或者等待超過LINGER_MS_CONFIG時傳送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }


    @Bean
    public Producer<Integer, Object> getKafkaProducer() {
        //KafkaProducer是執行緒安全的,可以在多個執行緒中共享單個例項
        return new KafkaProducer<Integer, Object>(producerConfigs());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

生產

@Component
public class Kafka_Producer  {

    public String topic = Kafka_Config.topic;

    @Autowired
    Producer producer;

    public void producer() throws Exception {

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
        try {
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println(metadata.partition() + ":" + metadata.offset());
                    }
                }
            });
        } catch (Exception e) {
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

消費

@Component
public class Kafka_Consumer implements InitializingBean {
    public String topic = Kafka_Config.topic;
    @Autowired
    Kafka_Config kafka_config;

    @Override
    public void afterPropertiesSet() throws Exception {
        //每個執行緒一個KafkaConsumer例項,且執行緒數設定成分割槽數,最大化提高消費能力
        int consumerThreadNum = 2;//執行緒數設定成分割槽數,最大化提高消費能力
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(kafka_config.customerConfigs(), topic).start();
        }
    }

    public class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String, String> kafkaConsumer;

        public KafkaConsumerThread(Properties props, String topic) {
            this.kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records =
                            kafkaConsumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("message------------ "+record.value());
                    }
                }
            } catch (Exception e) {
            } finally {
                kafkaConsumer.close();
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

歡迎關注公眾號fbzl95