1. 程式人生 > 實用技巧 >kafka急速入門與核心API解析

kafka急速入門與核心API解析

kafka環境安裝

上一節課我們已經對kafka的基本概念、核心思想有了一定的瞭解和認知,並且掌握了kafka在實際工作中的一些主要的應用場景。那麼接下來,我們就一起快速進入kafka的安裝吧。

  • kafka下載地址:http://kafka.apache.org/downloads.html

  • kafka安裝環境介紹:

    節點名稱節點作用節點備註
    hostname:192.168.11.111 zookeeper節點 kafka註冊、配置中心
    hostname:192.168.11.112 zookeeper節點 kafka註冊、配置中心
    hostname:192.168.11.113
    zookeeper節點 kafka註冊、配置中心
    hostname:192.168.11.51 kafka節點 此節點為kafka broker
  • kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小夥伴們先準備好zookeeper環境(三個節點即可),然後我們來一起構建kafka broker。

    ## 解壓命令:
    tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
    ## 改名命令:
    mv kafka_2.12-2.1.0/ kafka_2.12
    ## 進入解壓後的目錄,修改server.properties檔案:
    vim /usr/local/kafka_2.12/config/server.properties
    ## 修改配置:
    broker.id=0
    port=9092
    host.name=192.168.11.51
    advertised.host.name=192.168.11.51
    log.dirs=/usr/local/kafka_2.12/kafka-logs
    num.partitions=2
    zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
    
    ## 建立日誌資料夾:
    mkdir /usr/local/kafka_2.12/kafka-logs
    
    ##啟動kafka:
    /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
    

kafka常用命令


我們接下來一起了解幾個非常重要的命令,通過這些命令我們對kafka topic partition 進行檢視和操作。

  • 常用命令:

    ## 簡單操作:
    #(1)建立topic主題命令:(建立名為test的topic, 1個分割槽分別存放資料,資料備份總共1份)
    kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic topic1 --partitions 1  --replication-factor 1  
    ## --zookeeper 為zookeeper服務列表地址配置項,這裡任意指定zookeeper其中一個服務列表地址即可
    ## --create 命令後 --topic 為建立topic 並指定 topic name
    ## --partitions 為指定分割槽數量配置項
    ## --replication-factor 為指定副本集數量配置項
    
    #(2)檢視topic列表命令:
    kafka-topics.sh --zookeeper 192.168.11.111:2181 --list
    
    #(3)kafka命令傳送資料:(然後我們就可以編寫資料傳送出去了)
    kafka-console-producer.sh --broker-list 192.168.11.51:9092 --topic topic1
    ## --brokerlist kafka服務的broker節點列表
    
    #(4)kafka命令接受資料:(然後我們就可以看到消費的資訊了)
    kafka-console-consumer.sh --bootstrap-server 192.168.11.51:9092 --topic topic1 --from-beginning
    
    
    #(5)刪除topic命令:
    kafka-topics.sh --zookeeper 192.168.11.111:2181 --delete --topic topic1
    
    #(6)kafka檢視消費進度:(當我們需要檢視一個消費者組的消費進度時,則使用下面的命令)
    kafka-consumer-groups.sh --bootstrap-server 192.168.11.51:9092 --describe --group group1
    ## --describe --group 為訂閱組, 後面指定 group name
    

急速入門


下面我們一起使用kafka最基本的API來對kafka進行操作!

  • kafka依賴包:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
    </dependency> 
    
  • kafka生產者:

    package com.bfxy.mix.kafka;
    
    import java.util.Properties;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import com.alibaba.fastjson.JSON;
    
    public class CollectKafkaProducer {
    
        // 建立一個kafka生產者
    	private final KafkaProducer<String, String> producer;
    	// 定義一個成員變數為topic
    	private final String topic;
        
         // 初始化kafka的配置檔案和例項:Properties & KafkaProducer
    	public CollectKafkaProducer(String topic) { 
    		Properties props = new Properties(); 
             // 配置broker地址
    		props.put("bootstrap.servers", "192.168.11.51:9092"); 
             // 定義一個 client.id
    		props.put("client.id", "demo-producer-test"); 
    		
             // 其他配置項:
            
    //		props.put("batch.size", 16384);			//16KB -> 滿足16KB傳送批量訊息
    //		props.put("linger.ms", 10); 			//10ms -> 滿足10ms時間間隔傳送批量訊息
    //		props.put("buffer.memory", 33554432);	 //32M -> 快取提效能
    		
             // kafka 序列化配置:
    		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
             
             // 建立 KafkaProducer 與 接收 topic
    		this.producer = new KafkaProducer<>(props);
    		this.topic = topic; 
    	}
    
        // 傳送訊息 (同步或者非同步)
    	public void send(Object message, boolean syncSend) throws InterruptedException { 
    		try { 
                 // 同步傳送
    			if(syncSend) {
    				producer.send(new ProducerRecord<>(topic, JSON.toJSONString(message)));		
    			} 
                 // 非同步傳送(callback實現回撥監聽)
                 else {
    				producer.send(new ProducerRecord<>(topic, 
                                  JSON.toJSONString(message)), 
                                  new Callback() {
    					@Override
    					public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    	                    if (e != null) {
    	                        System.err.println("Unable to write to Kafka in CollectKafkaProducer [" + topic + "] exception: " + e);
    	                    }
    					}
    				});				
    			}
    		} catch (Exception e) {
    			e.printStackTrace(); 
    		} 
    	} 
    	
        // 關閉producer
    	public void close() {
    		producer.close();
    	}
    
        // 測試函式
    	public static void main(String[] args) throws InterruptedException {
    		String topic = "topic1";
    		CollectKafkaProducer collectKafkaProducer = new CollectKafkaProducer(topic);
    
    		for(int i = 0 ; i < 10; i ++) {
    			User user = new User();
    			user.setId(i+"");
    			user.setName("張三");
    			collectKafkaProducer.send(user, true);
    		}
            
    		Thread.sleep(Integer.MAX_VALUE);
    	
    	}
    	
    }
    
    
  • kafka消費者:

    package com.bfxy.mix.kafka;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import kafka.consumer.Consumer;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class CollectKafkaConsumer {
    	
        // 定義消費者例項
    	private final KafkaConsumer<String, String> consumer;
    	// 定義消費主題
    	private final String topic;
    
    
         // 消費者初始化
    	public CollectKafkaConsumer(String topic) { 
    		Properties props = new Properties();
             // 消費者的zookeeper 地址配置
    		props.put("zookeeper.connect", "192.168.11.111:2181"); 
             // 消費者的broker 地址配置
    		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.51:9092");
    		// 消費者組定義
             props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id"); 
             // 是否自動提交(auto commit,一般生產環境均設定為false,則為手工確認)
    		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
             // 自動提交配置項
    //		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
             // 消費進度(位置 offset)重要設定: latest,earliest 
    		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             // 超時時間配置
    		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
    		// kafka序列化配置
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
    		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
            
             // 建立consumer物件 & 賦值topic
    		consumer = new KafkaConsumer<>(props); 
    		this.topic = topic; 
             // 訂閱消費主題
    		consumer.subscribe(Collections.singletonList(topic));
    		
    	} 
    	
    	// 迴圈拉取訊息並進行消費,手工ACK方式
        private void receive(KafkaConsumer<String, String> consumer) {
            while (true) {
                // 	拉取結果集(拉取超時時間為1秒)
            	ConsumerRecords<String, String> records = consumer.poll(1000);
                //  拉取結果集後獲取具體訊息的主題名稱 分割槽位置 訊息數量
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    String topic = partition.topic();
                    int size = partitionRecords.size();
                    log.info("獲取topic:{},分割槽位置:{},訊息數為:{}", topic, partition.partition(), size);
                	// 分別對每個partition進行處理
                    for (int i = 0; i< size; i++) {
                    	System.err.println("-----> value: " + partitionRecords.get(i).value());
                        long offset = partitionRecords.get(i).offset() + 1;
                    	// consumer.commitSync(); // 這種提交會自動獲取partition 和 offset 
                         // 這種是顯示提交partition 和 offset 進度
    				   consumer.commitSync(Collections.singletonMap(partition, 
                                                                new OffsetAndMetadata(offset)));
                        log.info("同步成功, topic: {}, 提交的 offset: {} ", topic, offset);
                    }
    
                }
            }
        }
    	
        // 測試函式
    	public static void main(String[] args) {
    		String topic = "topic1";
    		CollectKafkaConsumer collectKafkaConsumer = new CollectKafkaConsumer(topic);
    		collectKafkaConsumer.receive(collectKafkaConsumer.consumer);
    	}
    }