1. 程式人生 > >kafka訊息生產者--新舊api

kafka訊息生產者--新舊api

新版本的kafka的對於訊息傳送生產者提供了新的api,相對於舊版的api,效能有了大大的提升:

舊版API:

package com.ztesoft.iot.bill.spark.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import com.ztesoft.iot.core.async.DisruptorInst;
import com.ztesoft.iot.core.async.execute.AsyncExecute;
import com.ztesoft.iot.core.async.execute.BusinessHandler;
import com.ztesoft.iot.core.async.execute.IExecute;
import com.ztesoft.iot.core.utils.TimeUtil;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducerBak {

	private final Producer<String, String> producer;
	public final static String TOPIC = "test_yucg";
	private final boolean ifLocal = true;
	final int COUNT = 10;
	private final static long insert_size = 1000;
	private final static int thread_num = 3;
	private static long start = 0;
	private static String start_local_time = "";
	private static long end = 0;
	private static String end_local_time = "";
	private static long msg_num = 0;

	private KafkaProducerBak() {
		
	Properties properties = new Properties();  
	properties.put("metadata.broker.list", "127.0.0.1:9092");// 此處配置的是kafka的埠
	properties.put("zk.connect", "127.0.0.1:2181");
        properties.put("acks", "all");  
        properties.put("retries", 0); // 訊息傳送請求失敗重試次數  
        properties.put("batch.size", 4000);  
        properties.put("linger.ms", 1); // 訊息逗留在緩衝區的時間,等待更多的訊息進入緩衝區一起傳送,減少請求傳送次數  
        properties.put("buffer.memory", 8388608); // 記憶體緩衝區的總量  
        // 如果傳送到不同分割槽,並且不想採用預設的Utils.abs(key.hashCode) % numPartitions分割槽方式,則需要自己自定義分割槽邏輯  
//      properties.put("partitioner.class", "com.ztesoft.iot.bill.spark.kafka.SimplePartitioner");  
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
        properties.put("serializer.class", "kafka.serializer.StringEncoder");// 配置value的序列化類
        producer = new Producer<String, String>(new ProducerConfig(properties));
	}

	public void insert() {
		IExecute execute = new AsyncExecute(new BusinessHandler() {
			@Override
			public void execute(Object... args) {
				int messageNo = 0;
				try {
					if(start==0) {
						start = System.currentTimeMillis();
					}
					if("".equals(start_local_time)) {
						start_local_time = TimeUtil.getDefaultTime();
					}
					int i = 1;
					List<KeyedMessage<String,String>> list = new ArrayList<KeyedMessage<String,String>>();
					while (i < insert_size) {
						String key = String.valueOf(messageNo);
						String data = "hello kafka message " + key;
						list.add(new KeyedMessage<String, String>(TOPIC,data));
						messageNo++;
						i++;
						try {
							if(list.size()>=2000) {
								producer.send(list);
								list.clear();
								System.out.println("已推送資料:"+msg_num);
							}
						}catch (Exception e) {
							e.printStackTrace();
						}
					}
					if(list.size()>0) {
						try {
							producer.send(list);
							list.clear();
						}catch (Exception e) {
							e.printStackTrace();
						}
					}
					msg_num = msg_num+insert_size;
					long end_time = System.currentTimeMillis();
					if(end==0||end_time>end) {
						end = end_time;
						end_local_time = TimeUtil.getDefaultTime();
					}
					System.out.println("*******************************************");
					System.out.println("開始時間:"+start_local_time +"  "+"結束時間:"+end_local_time+"  "+"   插入資料:"+msg_num+"   用時:"+((end-start)/1000)+"s");
					System.out.println("*******************************************");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
		DisruptorInst.getInst().publish(execute);
	}
	
	public void batchInsert() {
		int i = 0;
		while(i<thread_num) {
			this.insert();
			i++;
		}
	}

	public static void main(String[] args) {
		KafkaProducerBak kafkaProducerBak = new KafkaProducerBak();
		kafkaProducerBak.batchInsert();
	}
}
新版API,直接上已封裝好的介面程式碼:
package com.ztesoft.iot.core.kafka;

import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.ztesoft.iot.core.msg.IMsgProducer;
import com.ztesoft.iot.core.utils.PropertiesUtils;
import com.ztesoft.iot.core.utils.StringUtil;

public class KafkaMsgProducer<T> implements IMsgProducer<T> {

	private KafkaProducer<String, T> producer = null;

	private String topic = "";

	public KafkaMsgProducer(String key) {
		if (producer == null) {
			topic = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "kafka." + key + ".topic");
			String metadata_broker_list = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "metadata.broker.list");
			String zk_connect = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "zk.connect");
			String acks = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "acks");
			String retries = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "kafka." + key + ".produceRetries");
			if (StringUtil.isEmpty(retries)) {
				retries = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "retries");
			}
			String batch_size = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".produceBatchSize");
			String linger_ms = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".produceLingerMs");
			String bufferMemory = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".produceBufferMemory");
			// 分割槽策略,可以為空
			String partitioner = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA,"kafka." + key + ".producePartitioner");
			String key_serializer = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "key.serializer");
			String value_serializer = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "kafka." + key + ".valueSerializer");
			if(StringUtil.isEmpty(value_serializer)) {
				value_serializer = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "value.serializer");
			}
			String serializer_class = PropertiesUtils.getString(PropertiesUtils.PROP_KAFKA, "serializer.class");
			Properties properties = new Properties();
			properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadata_broker_list);
			properties.put(ProducerConfig.ACKS_CONFIG, acks);
			properties.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retries)); // 訊息傳送請求失敗重試次數
			properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batch_size));
			properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(linger_ms)); // 訊息逗留在緩衝區的時間,等待更多的訊息進入緩衝區一起傳送,減少請求傳送次數
			properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.parseInt(bufferMemory)); // 記憶體緩衝區的總量
			
			properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, key_serializer);
			properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, value_serializer);
			if (!StringUtil.isEmpty(partitioner)) {// 分割槽策略
				properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitioner);
			}
			producer = new KafkaProducer<String, T>(properties);
		}
	}

	@Override
	public void send(T msg) throws Exception {
		producer.send(new ProducerRecord<String, T>(topic, msg));
	}

	@Override
	public void batchSend(List<T> msgs) throws Exception {
		for (T msg : msgs) {
			producer.send(new ProducerRecord<String, T>(topic, msg));
		}
	}
}
新舊版本的API在訊息傳送的效率上的差異是非常明顯,讀者可以自己去嘗試壓測;ps : 樓主使用kafka版本是kafka_2.11-1.1.0