kafka 生產者(一)
1、生產者訊息傳送流程
1.1、傳送原理
在訊息傳送的過程中,涉及到了兩個執行緒——main執行緒和Sender執行緒。在main執行緒中建立了一個雙端佇列RecordAccumulator。main執行緒將訊息傳送給RecordAccumulator,Sender執行緒不斷從RecordAccumulator中拉取訊息傳送到KafkaBroker。
1.2、生產者引數列表
2、同步傳送API
2.1、普通非同步傳送
引入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>
非同步傳送測試程式碼
public class CustomProducer { public static void main(String[] args) { Properties properties = new Properties(); //連線ZK properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");//設定KV序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //指定 kv 的序列化型別 //1、建立 生產者 KafkaProducer<String, String> KafkaProducer = newKafkaProducer<String, String>(properties); //2、傳送資料 put非同步傳送 for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", i + " hello wdh01")); } //3、關閉資源 KafkaProducer.close(); } }
開啟kafka 消費資料
[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first 0 hello wdh01 1 hello wdh01 2 hello wdh01 3 hello wdh01 4 hello wdh01
2.2、帶回調函式的非同步傳送
回撥函式會在producer收到ack時呼叫,為非同步呼叫,該方法有兩個引數,分別是元資料資訊(RecordMetadata)和異常資訊(Exception),如果Exception為null,說明訊息傳送成功,如果Exception不為null,說明訊息傳送失敗。
注意:訊息傳送失敗會自動重試,不需要我們在回撥函式中手動重試。
public class CustomProducerCallBack { public static void main(String[] args) { Properties properties = new Properties(); //連線ZK properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,"); //設定KV序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //指定 kv 的序列化型別 //1、建立 生產者 KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties); //2、傳送資料 put非同步傳送 for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", i + " hello wdh01"), new Callback() { // new Callback( 回撥函式 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("主題 " + metadata.topic() + " 分割槽 " + metadata.partition()); } } }); } //3、關閉資源 KafkaProducer.close(); } }
消費到資料
[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first 0 hello wdh01 1 hello wdh01 2 hello wdh01 3 hello wdh01 4 hello wdh01
控制檯輸出回撥資訊
主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1
3、同步傳送API
只需在非同步傳送的基礎上,再呼叫一下get()方法即可
public class CustomProducerSync { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties properties = new Properties(); //連線ZK properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,"); //設定KV序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //指定 kv 的序列化型別 //1、建立 生產者 KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties); //2、傳送資料 同步傳送 for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", i + " hello wdh01")).get(); } //3、關閉資源 KafkaProducer.close(); } }
消費到資料
[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first 0 hello wdh01 1 hello wdh01 2 hello wdh01 3 hello wdh01 4 hello wdh01
4、生產者分割槽
4.1、分割槽好處
- 便於合理使用儲存資源,每個Partition在一個Broker上儲存,可以把海量的資料按照分割槽切割成一塊一塊資料儲存在多臺Broker上。合理控制分割槽的任務,可以實現負載均衡的效果。
- 提高並行度,生產者可以以分割槽為單位傳送資料;消費者可以以分割槽為單位進行消費資料。ConsumerConsumerConsumerss 100T資料le
4.2、分割槽策略
分割槽策略在 DefaultPartitioner 有詳細的說明,idea 裡 ctrl + n 輸入 DefaultPartitioner
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose the sticky partition that changes when the batch is full. * * See KIP-480 for details about sticky partitioning. */ public class DefaultPartitioner implements Partitioner {
以下幾個方法都指明partition的情況,直接將指明的值作為partition值;例如partition=0,所有資料寫入分割槽0
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) { public ProducerRecord(String topic, Integer partition, K key, V value) {
方法內容詳見
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null."); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException( String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); if (partition != null && partition < 0) throw new IllegalArgumentException( String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers); } /** * Creates a record with a specified timestamp to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the * timestamp using System.currentTimeMillis(). * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { this(topic, partition, timestamp, key, value, null); } /** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents * @param headers The headers that will be included in the record */ public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) { this(topic, partition, null, key, value, headers); } /** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); }View Code
下面這個分割槽邏輯沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取餘得到partition值;例如:key1的hash值=5,key2的hash值=6,topic的partition數=2,那麼key1對應的value1寫入1號分割槽,key2對應的value2寫入0號分割槽。
public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); }
最後一個分割槽既沒有partition值又沒有key值的情況下,Kafka採用StickyPartition(黏性分割槽器),會隨機選擇一個分割槽,並儘可能一直使用該分割槽,待該分割槽的batch已滿或者已完成,Kafka再隨機一個分割槽進行使用(和上一次的分割槽不同)。例如:第一次隨機選擇0號分割槽,等0號分割槽當前批次滿了(預設16k)或者linger.ms設定的時間到,Kafka再隨機一個分割槽進行使用(如果還是0會繼續隨機)。
public ProducerRecord(String topic, V value) { this(topic, null, null, null, value, null); }
測試1 將資料發往指定partition的情況下,例如,將所有資料發往分割槽1中。
public class CustomProducerCallBackPartitions { public static void main(String[] args) { Properties properties = new Properties(); //連線ZK properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,"); //設定KV序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //指定 kv 的序列化型別 //1、建立 生產者 KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties); //2、傳送資料 put非同步傳送 for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", 1, "", i + " hello wdh01"), new Callback() { // new Callback( 回撥函式 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("主題 " + metadata.topic() + " 分割槽 " + metadata.partition()); } } }); } //3、關閉資源 KafkaProducer.close(); } }
執行後回撥資訊
主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1
消費資料
[hui@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic first 0 hello wdh01 1 hello wdh01 2 hello wdh01 3 hello wdh01 4 hello wdh01
測試2 沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取餘得到partition值。
KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties); //2、傳送資料 put非同步傳送 for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", "a", i + " hello wdh01"), new Callback() { // new Callback( 回撥函式 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(" a 主題 " + metadata.topic() + " 分割槽 " + metadata.partition()); } } }); } Thread.sleep(1000); for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", "b", i + " hello wdh01"), new Callback() { // new Callback( 回撥函式 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(" b 主題 " + metadata.topic() + " 分割槽 " + metadata.partition()); } } }); } Thread.sleep(1000); for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", "f", i + " hello wdh01"), new Callback() { // new Callback( 回撥函式 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(" f 主題 " + metadata.topic() + " 分割槽 " + metadata.partition()); } } }); } //3、關閉資源 KafkaProducer.close();
回撥結果
a 主題 first 分割槽 1 a 主題 first 分割槽 1 a 主題 first 分割槽 1 a 主題 first 分割槽 1 a 主題 first 分割槽 1 b 主題 first 分割槽 2 b 主題 first 分割槽 2 b 主題 first 分割槽 2 b 主題 first 分割槽 2 b 主題 first 分割槽 2 f 主題 first 分割槽 0 f 主題 first 分割槽 0 f 主題 first 分割槽 0 f 主題 first 分割槽 0 f 主題 first 分割槽 0View Code
4.3、自定義分割槽
kafka 支援自定義分割槽,只要實現一個 Partitioner 即可
案例
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //過濾資料 int partiton; String mag = value.toString(); if (mag.contains("wdh01")) { partiton = 0; } else { partiton = 1; } return partiton; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
自定義分割槽測試
public class CustomProducerCallBackPartitionsCustom { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //關聯自定義分割槽器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.wdh01.kk.MyPartitioner"); //指定 kv 的序列化型別 //1、建立 生產者 KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties); //2、傳送資料 put非同步傳送 for (int i = 0; i < 5; i++) { KafkaProducer.send(new ProducerRecord<>("first", i + " hello wdh1"), new Callback() { // new Callback( 回撥函式 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("主題 " + metadata.topic() + " 分割槽 " + metadata.partition()); } } }); } //3、關閉資源 KafkaProducer.close(); } }
回撥結果
主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1 主題 first 分割槽 1