1. 程式人生 > 其它 >Kafka使用詳解-Producer API(同步傳送資料)

Kafka使用詳解-Producer API(同步傳送資料)

技術標籤:kafka生產者kafkakafka

同步傳送

意思:一條訊息傳送後,會阻塞當前執行緒,直至返回ack。send返回的結果時future物件

package com.ln.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.
kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * @ProjectName: kafka * @Package: com.ln.kafka.producer * @Name:SyncProducer * @Author:linianest * @CreateTime:2021/1/8 9:16 * @version:1.0 * @Description TODO:kafka同步傳送訊息 */
public class SyncProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); // 每批次傳送數量大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 等待時間 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 攔截器 ArrayList<String> interceptors = new ArrayList<>(); interceptors.add("com.ln.kafka.interceptor.TimeInterceptor"); interceptors.add("com.ln.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); // 建立物件 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 傳送資料 for (int i = 0; i < 1000; i++) { /** * // 等待時間,如果batch.size沒達到,靠LINGER_MS_CONFIG傳送 * props.put(ProducerConfig.LINGER_MS_CONFIG, 1); */ RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("first", i + "", "message-" + i)).get(); System.out.println("offset=" + metadata.offset()); } // 關閉資料流 producer.close(); } }