Kafka使用詳解-Producer API(同步傳送資料)
阿新 • • 發佈:2021-01-09
同步傳送
意思:一條訊息傳送後,會阻塞當前執行緒,直至返回ack。send返回的結果時future物件
package com.ln.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache. kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @ProjectName: kafka
* @Package: com.ln.kafka.producer
* @Name:SyncProducer
* @Author:linianest
* @CreateTime:2021/1/8 9:16
* @version:1.0
* @Description TODO:kafka同步傳送訊息
*/
public class SyncProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 每批次傳送數量大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 等待時間
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 攔截器
ArrayList<String> interceptors = new ArrayList<>();
interceptors.add("com.ln.kafka.interceptor.TimeInterceptor");
interceptors.add("com.ln.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// 建立物件
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 傳送資料
for (int i = 0; i < 1000; i++) {
/**
* // 等待時間,如果batch.size沒達到,靠LINGER_MS_CONFIG傳送
* props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
*/
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("first", i + "", "message-" + i)).get();
System.out.println("offset=" + metadata.offset());
}
// 關閉資料流
producer.close();
}
}