1. 程式人生 > 其它 >kafka 生產者(一)

kafka 生產者(一)

1、生產者訊息傳送流程

1.1、傳送原理

在訊息傳送的過程中,涉及到了兩個執行緒——main執行緒和Sender執行緒。在main執行緒中建立了一個雙端佇列RecordAccumulator。main執行緒將訊息傳送給RecordAccumulator,Sender執行緒不斷從RecordAccumulator中拉取訊息傳送到KafkaBroker。

1.2、生產者引數列表

2、同步傳送API

2.1、普通非同步傳送

 引入依賴

        <dependency>
            <groupId>org.apache.kafka</groupId
> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>

非同步傳送測試程式碼

public class CustomProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //連線ZK
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");
        
//設定KV序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //指定 kv 的序列化型別 //1、建立 生產者 KafkaProducer<String, String> KafkaProducer = new
KafkaProducer<String, String>(properties); //2、傳送資料 put非同步傳送 for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", i + " hello wdh01")); } //3、關閉資源 KafkaProducer.close(); } }

開啟kafka 消費資料 

[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first
0  hello wdh01
1  hello wdh01
2  hello wdh01
3  hello wdh01
4  hello wdh01

2.2、帶回調函式的非同步傳送

回撥函式會在producer收到ack時呼叫,為非同步呼叫,該方法有兩個引數,分別是元資料資訊(RecordMetadata)和異常資訊(Exception),如果Exception為null,說明訊息傳送成功,如果Exception不為null,說明訊息傳送失敗。

 注意:訊息傳送失敗會自動重試,不需要我們在回撥函式中手動重試。

public class CustomProducerCallBack {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //連線ZK
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");
        //設定KV序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //指定 kv 的序列化型別
        //1、建立 生產者
        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties);
        //2、傳送資料 put非同步傳送
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first", i + "  hello wdh01"), new Callback() {
                // new Callback( 回撥函式
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主題 " + metadata.topic() + " 分割槽 " + metadata.partition());
                    }
                }
            });
        }
        //3、關閉資源
        KafkaProducer.close();
    }
}

消費到資料

[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first
0  hello wdh01
1  hello wdh01
2  hello wdh01
3  hello wdh01
4  hello wdh01

控制檯輸出回撥資訊

主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1

3、同步傳送API

 只需在非同步傳送的基礎上,再呼叫一下get()方法即可

public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //連線ZK
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");
        //設定KV序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //指定 kv 的序列化型別
        //1、建立 生產者
        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties);
        //2、傳送資料  同步傳送
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first", i + "  hello wdh01")).get();
        }
        //3、關閉資源
        KafkaProducer.close();
    }
}

消費到資料

[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first
0  hello wdh01
1  hello wdh01
2  hello wdh01
3  hello wdh01
4  hello wdh01

4、生產者分割槽

4.1、分割槽好處

  1. 便於合理使用儲存資源,每個Partition在一個Broker上儲存,可以把海量的資料按照分割槽切割成一塊一塊資料儲存在多臺Broker上。合理控制分割槽的任務,可以實現負載均衡的效果。
  2. 提高並行度,生產者可以以分割槽為單位傳送資料;消費者可以以分割槽為單位進行消費資料。ConsumerConsumerConsumerss 100T資料le

 4.2、分割槽策略

分割槽策略在 DefaultPartitioner 有詳細的說明,idea 裡 ctrl + n 輸入 DefaultPartitioner

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
 * 
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

以下幾個方法都指明partition的情況,直接將指明的值作為partition值;例如partition=0,所有資料寫入分割槽0

 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
 public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
 public ProducerRecord(String topic, Integer partition, K key, V value) {

方法內容詳見

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }

    /**
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the
     *                  timestamp using System.currentTimeMillis().
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }

    /**
     * Creates a record to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers The headers that will be included in the record
     */
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }
    
    /**
     * Creates a record to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
View Code

下面這個分割槽邏輯沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取餘得到partition值;例如:key1的hash值=5,key2的hash值=6,topic的partition數=2,那麼key1對應的value1寫入1號分割槽,key2對應的value2寫入0號分割槽。

    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }

最後一個分割槽既沒有partition值又沒有key值的情況下,Kafka採用StickyPartition(黏性分割槽器),會隨機選擇一個分割槽,並儘可能一直使用該分割槽,待該分割槽的batch已滿或者已完成,Kafka再隨機一個分割槽進行使用(和上一次的分割槽不同)。例如:第一次隨機選擇0號分割槽,等0號分割槽當前批次滿了(預設16k)或者linger.ms設定的時間到,Kafka再隨機一個分割槽進行使用(如果還是0會繼續隨機)。

    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }

測試1 將資料發往指定partition的情況下,例如,將所有資料發往分割槽1中。

public class CustomProducerCallBackPartitions {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //連線ZK
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");
        //設定KV序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //指定 kv 的序列化型別
        //1、建立 生產者
        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties);
        //2、傳送資料 put非同步傳送
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first", 1, "", i + "  hello wdh01"), new Callback() {
                // new Callback( 回撥函式
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主題 " + metadata.topic() + " 分割槽 " + metadata.partition());
                    }
                }
            });
        }
        //3、關閉資源
        KafkaProducer.close();
    }
}

執行後回撥資訊

主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1

消費資料

[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first
0  hello wdh01
1  hello wdh01
2  hello wdh01
3  hello wdh01
4  hello wdh01

 測試2 沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取餘得到partition值。

  KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties);
        //2、傳送資料 put非同步傳送
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first",   "a", i + "  hello wdh01"), new Callback() {
                // new Callback( 回撥函式
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println(" a 主題  " + metadata.topic() + " 分割槽 " + metadata.partition());
                    }
                }
            });
        }
        Thread.sleep(1000);
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first",   "b", i + "  hello wdh01"), new Callback() {
                // new Callback( 回撥函式
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println(" b 主題 " + metadata.topic() + " 分割槽 " + metadata.partition());
                    }
                }
            });
        }
        Thread.sleep(1000);
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first",   "f", i + "  hello wdh01"), new Callback() {
                // new Callback( 回撥函式
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println(" f 主題 " + metadata.topic() + " 分割槽 " + metadata.partition());
                    }
                }
            });
        }
        //3、關閉資源
        KafkaProducer.close();

回撥結果

 a 主題  first 分割槽 1
 a 主題  first 分割槽 1
 a 主題  first 分割槽 1
 a 主題  first 分割槽 1
 a 主題  first 分割槽 1
 b 主題 first 分割槽 2
 b 主題 first 分割槽 2
 b 主題 first 分割槽 2
 b 主題 first 分割槽 2
 b 主題 first 分割槽 2
 f 主題 first 分割槽 0
 f 主題 first 分割槽 0
 f 主題 first 分割槽 0
 f 主題 first 分割槽 0
 f 主題 first 分割槽 0
View Code

4.3、自定義分割槽

kafka 支援自定義分割槽,只要實現一個  Partitioner 即可

案例

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //過濾資料
        int partiton;
        String mag = value.toString();
        if (mag.contains("wdh01")) {
            partiton = 0;
        } else {
            partiton = 1;
        }
        return partiton;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

自定義分割槽測試

public class CustomProducerCallBackPartitionsCustom {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //關聯自定義分割槽器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.wdh01.kk.MyPartitioner");
        //指定 kv 的序列化型別
        //1、建立 生產者
        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties);
        //2、傳送資料 put非同步傳送
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first", i + "  hello wdh1"), new Callback() {
                // new Callback( 回撥函式
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主題 " + metadata.topic() + " 分割槽 " + metadata.partition());
                    }
                }
            });
        }
        //3、關閉資源
        KafkaProducer.close();
    }
}

回撥結果

主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1
主題 first 分割槽 1