kafka訊息生產者--新舊api
阿新 • • 發佈:2019-01-26
新版本的kafka的對於訊息傳送生產者提供了新的api,相對於舊版的api,效能有了大大的提升:
舊版API:
新版API,直接上已封裝好的介面程式碼:package com.ztesoft.iot.bill.spark.kafka; import java.util.ArrayList; import java.util.List; import java.util.Properties; import com.ztesoft.iot.core.async.DisruptorInst; import com.ztesoft.iot.core.async.execute.AsyncExecute; import com.ztesoft.iot.core.async.execute.BusinessHandler; import com.ztesoft.iot.core.async.execute.IExecute; import com.ztesoft.iot.core.utils.TimeUtil; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducerBak { private final Producer<String, String> producer; public final static String TOPIC = "test_yucg"; private final boolean ifLocal = true; final int COUNT = 10; private final static long insert_size = 1000; private final static int thread_num = 3; private static long start = 0; private static String start_local_time = ""; private static long end = 0; private static String end_local_time = ""; private static long msg_num = 0; private KafkaProducerBak() { Properties properties = new Properties(); properties.put("metadata.broker.list", "127.0.0.1:9092");// 此處配置的是kafka的埠 properties.put("zk.connect", "127.0.0.1:2181"); properties.put("acks", "all"); properties.put("retries", 0); // 訊息傳送請求失敗重試次數 properties.put("batch.size", 4000); properties.put("linger.ms", 1); // 訊息逗留在緩衝區的時間,等待更多的訊息進入緩衝區一起傳送,減少請求傳送次數 properties.put("buffer.memory", 8388608); // 記憶體緩衝區的總量 // 如果傳送到不同分割槽,並且不想採用預設的Utils.abs(key.hashCode) % numPartitions分割槽方式,則需要自己自定義分割槽邏輯 // properties.put("partitioner.class", "com.ztesoft.iot.bill.spark.kafka.SimplePartitioner"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("serializer.class", "kafka.serializer.StringEncoder");// 配置value的序列化類 producer = new Producer<String, String>(new ProducerConfig(properties)); } public void insert() { IExecute execute = new AsyncExecute(new BusinessHandler() { @Override public void execute(Object... args) { int messageNo = 0; try { if(start==0) { start = System.currentTimeMillis(); } if("".equals(start_local_time)) { start_local_time = TimeUtil.getDefaultTime(); } int i = 1; List<KeyedMessage<String,String>> list = new ArrayList<KeyedMessage<String,String>>(); while (i < insert_size) { String key = String.valueOf(messageNo); String data = "hello kafka message " + key; list.add(new KeyedMessage<String, String>(TOPIC,data)); messageNo++; i++; try { if(list.size()>=2000) { producer.send(list); list.clear(); System.out.println("已推送資料:"+msg_num); } }catch (Exception e) { e.printStackTrace(); } } if(list.size()>0) { try { producer.send(list); list.clear(); }catch (Exception e) { e.printStackTrace(); } } msg_num = msg_num+insert_size; long end_time = System.currentTimeMillis(); if(end==0||end_time>end) { end = end_time; end_local_time = TimeUtil.getDefaultTime(); } System.out.println("*******************************************"); System.out.println("開始時間:"+start_local_time +" "+"結束時間:"+end_local_time+" "+" 插入資料:"+msg_num+" 用時:"+((end-start)/1000)+"s"); System.out.println("*******************************************"); } catch (Exception e) { e.printStackTrace(); } } }); DisruptorInst.getInst().publish(execute); } public void batchInsert() { int i = 0; while(i<thread_num) { this.insert(); i++; } } public static void main(String[] args) { KafkaProducerBak kafkaProducerBak = new KafkaProducerBak(); kafkaProducerBak.batchInsert(); } }
新舊版本的API在訊息傳送的效率上的差異是非常明顯,讀者可以自己去嘗試壓測;ps : 樓主使用kafka版本是kafka_2.11-1.1.0package com.ztesoft.iot.core.kafka; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import com.ztesoft.iot.core.msg.IMsgProducer; import com.ztesoft.iot.core.utils.PropertiesUtils; import com.ztesoft.iot.core.utils.StringUtil; public class KafkaMsgProducer<T> implements IMsgProducer<T> { private KafkaProducer<String, T> producer = null; private String topic = ""; public KafkaMsgProducer(String key) { if (producer == null) { topic = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "kafka." + key + ".topic"); String metadata_broker_list = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "metadata.broker.list"); String zk_connect = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "zk.connect"); String acks = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "acks"); String retries = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "kafka." + key + ".produceRetries"); if (StringUtil.isEmpty(retries)) { retries = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "retries"); } String batch_size = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".produceBatchSize"); String linger_ms = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".produceLingerMs"); String bufferMemory = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".produceBufferMemory"); // 分割槽策略,可以為空 String partitioner = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".producePartitioner"); String key_serializer = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "key.serializer"); String value_serializer = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "kafka." + key + ".valueSerializer"); if(StringUtil.isEmpty(value_serializer)) { value_serializer = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "value.serializer"); } String serializer_class = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "serializer.class"); Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadata_broker_list); properties.put(ProducerConfig.ACKS_CONFIG, acks); properties.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retries)); // 訊息傳送請求失敗重試次數 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batch_size)); properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(linger_ms)); // 訊息逗留在緩衝區的時間,等待更多的訊息進入緩衝區一起傳送,減少請求傳送次數 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.parseInt(bufferMemory)); // 記憶體緩衝區的總量 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, key_serializer); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, value_serializer); if (!StringUtil.isEmpty(partitioner)) {// 分割槽策略 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitioner); } producer = new KafkaProducer<String, T>(properties); } } @Override public void send(T msg) throws Exception { producer.send(new ProducerRecord<String, T>(topic, msg)); } @Override public void batchSend(List<T> msgs) throws Exception { for (T msg : msgs) { producer.send(new ProducerRecord<String, T>(topic, msg)); } } }