1. 程式人生 > >Kafka Producer生產資料時資料丟失分析

Kafka Producer生產資料時資料丟失分析

今天在測試 Storm 程式過程中,想通過執行在 idea 的 Kafka Producer 生產一條資料來驗證一下 Storm 程式,發現居然沒有成功將資料生產到 Kafka 叢集中,於是進行了一番測試,最終找到了原因!
注:下面程式測試中使用的 kafka 的版本為 0.10.2.0,zookeeper 的版本為 3.4.5

一、情景再現

在 linux 中執行如下命令來監控是否有資料生產到 kafka 中:

kafka-console-consumer --zookeeper localhost:2181 --topic test

生產一條資料到Kafka中,樣例程式碼如下:

package kafka;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.JaasUtils;
import utils.PropertiesUtil;

import
java.util.Properties; /** * Kafka客戶端操作工具類 * 0.10.2版本 * @author lwj * @date 2018/5/10 */ public class KafkaProducerUtil { private static KafkaProducer<String, String> producer = null; private static Properties props = null; static { props = new Properties(); props.put("bootstrap.servers"
, "XXX.XXX.XXX.XXX:XX"); props.put("acks", "all"); props.put("retries", "0"); props.put("batch.size", "16384"); // props.put("linger.ms", "1"); props.put("buffer.memory", "33554432"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer(props); } /** * 將訊息生產到kafka叢集 * @param topic * @param message */ public static void produce(String topic, String message){ producer.send(new ProducerRecord<String, String>(topic, message)); } public static void main(String[] args) { //生產多少條資料到Kafka(***) int num = 1 for (int i = 0; i < num; i++) { KafkaProducerUtil.produce("test", "test"+i); // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // // } } } }

發現並沒有資料生產到 broker 中!!


二、原因分析

如果將上面程式碼的 num 的值設定成 1000 的話,發現數據有生產到 Kafka 叢集中!!
如果解開上面 Thread.sleep() 程式碼的話,發現也有資料有生產到 Kafka 叢集中!!

所以,造成上面生產不成功的原因就是雖然呼叫了 producer.send() ,但是資料還沒來得及生產到 Kafka 叢集 主程式就掛掉了,於是資料就沒有生產到 Kafka 叢集中了~~

也就是說雖然 Kafka官網給出的文件中 Produer 的 linger.ms 引數 預設是 0,但是真實情況中 producer.send() 方法 “生效” 是有些許延遲的!


三、解決方法

如果對效能要求不高的話,可以再 producer.send() 方法呼叫後再呼叫 producer.flush() 方法,該方法會將資料全部生產到Kafka,否則就會阻塞。對於 producer.flush() 方法,原始碼原話如下:

"Flush any accumulated records form the producer. Blocks until all sends are complete."

但是這個方法有一點侷限性,就是對效能的影響有點大,這個是要注意的地方~

如果對效能要求比較高,同時也想把資料確切的生產到叢集的話,推薦將 linger.ms 引數設定一個比 0 大的值(預設是 0),batch.size 也可以設定一下(預設是16384),同時用 producer.send(ProducerRecord<K,V>, Callback) 來將資料生產到叢集中,其中 Callback 匿名內部類中的 onCompletion() 方法用來處理 “確認生產到叢集” 的邏輯~~