kafka原始碼解析之十六生產者流程(客戶端如何向topic傳送資料)
阿新 • • 發佈:2019-01-01
客戶端向topic傳送資料分為兩種方式:1.非同步,2同步。其配置為producer.type,如果為sync,則是同步傳送;如果為async,則是非同步傳送。
客戶端程式碼如下:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.*; public class KafkaProducer { private Producer<String,String> inner; public KafkaProducer() throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("KafkaProducer.properties")); /* KafkaProducer.properties內容如下 *metadata.broker.list=172.23.9.134:9092,172.23.9.135:9092,172.23.9.136:9092 * producer.type=sync * compression.codec=0 * serializer.class=kafka.serializer.StringEncoder * request.required.acks=1 */ //根據KfkaProducer.properties載入配置資訊 ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } //一條一條傳送 public void send(String topicName, String message) { if(topicName == null || message == null){ return; } KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); inner.send(km); } //批量傳送 public void send(String topicName, Collection<String> messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); for(String entry : messages){ KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } public static void main(String[] args) { KafkaProducer producer = null; try{ producer = new KafkaProducer(); int i=0; while(true){ StringBuffer sbMsg = new StringBuffer(); sbMsg.append("content"); //以KV對的形式傳送 producer.send("key", sbMsg.toString()); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } }
那麼接下去會發生什麼呢?最終會呼叫到kafka.javaapi.producer
package kafka.javaapi.producer import kafka.producer.ProducerConfig import kafka.producer.KeyedMessage import scala.collection.mutable class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) { //最終實現生成者功能的是kafka.producer.Producer def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config)) def send(message: KeyedMessage[K,V]) { underlying.send(message) } def send(messages: java.util.List[KeyedMessage[K,V]]) { import collection.JavaConversions._ underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*) } def close = underlying.close }
其具體的傳送邏輯如下: