Kafka Producer生產資料時資料丟失分析
今天在測試 Storm 程式過程中,想通過執行在 idea 的 Kafka Producer 生產一條資料來驗證一下 Storm 程式,發現居然沒有成功將資料生產到 Kafka 叢集中,於是進行了一番測試,最終找到了原因!
注:下面程式測試中使用的 kafka 的版本為 0.10.2.0,zookeeper 的版本為 3.4.5
一、情景再現
在 linux 中執行如下命令來監控是否有資料生產到 kafka 中:
kafka-console-consumer --zookeeper localhost:2181 --topic test
生產一條資料到Kafka中,樣例程式碼如下:
package kafka;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.JaasUtils;
import utils.PropertiesUtil;
import java.util.Properties;
/**
* Kafka客戶端操作工具類
* 0.10.2版本
* @author lwj
* @date 2018/5/10
*/
public class KafkaProducerUtil {
private static KafkaProducer<String, String> producer = null;
private static Properties props = null;
static {
props = new Properties();
props.put("bootstrap.servers" , "XXX.XXX.XXX.XXX:XX");
props.put("acks", "all");
props.put("retries", "0");
props.put("batch.size", "16384");
// props.put("linger.ms", "1");
props.put("buffer.memory", "33554432");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(props);
}
/**
* 將訊息生產到kafka叢集
* @param topic
* @param message
*/
public static void produce(String topic, String message){
producer.send(new ProducerRecord<String, String>(topic, message));
}
public static void main(String[] args) {
//生產多少條資料到Kafka(***)
int num = 1
for (int i = 0; i < num; i++) {
KafkaProducerUtil.produce("test", "test"+i);
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
//
// }
}
}
}
發現並沒有資料生產到 broker 中!!
二、原因分析
如果將上面程式碼的 num
的值設定成 1000 的話,發現數據有生產到 Kafka 叢集中!!
如果解開上面 Thread.sleep()
程式碼的話,發現也有資料有生產到 Kafka 叢集中!!
所以,造成上面生產不成功的原因就是雖然呼叫了 producer.send()
,但是資料還沒來得及生產到 Kafka 叢集 主程式就掛掉了,於是資料就沒有生產到 Kafka 叢集中了~~
也就是說雖然 Kafka官網給出的文件中 Produer 的 linger.ms
引數 預設是 0,但是真實情況中 producer.send()
方法 “生效” 是有些許延遲的!
三、解決方法
如果對效能要求不高的話,可以再 producer.send()
方法呼叫後再呼叫 producer.flush()
方法,該方法會將資料全部生產到Kafka,否則就會阻塞。對於 producer.flush()
方法,原始碼原話如下:
"Flush any accumulated records form the producer. Blocks until all sends are complete."
但是這個方法有一點侷限性,就是對效能的影響有點大,這個是要注意的地方~
如果對效能要求比較高,同時也想把資料確切的生產到叢集的話,推薦將 linger.ms
引數設定一個比 0
大的值(預設是 0
),batch.size
也可以設定一下(預設是16384),同時用 producer.send(ProducerRecord<K,V>, Callback)
來將資料生產到叢集中,其中 Callback 匿名內部類中的 onCompletion()
方法用來處理 “確認生產到叢集” 的邏輯~~