1. 程式人生 > >kafka原始碼解析之十六生產者流程(客戶端如何向topic傳送資料)

kafka原始碼解析之十六生產者流程(客戶端如何向topic傳送資料)

客戶端向topic傳送資料分為兩種方式:1.非同步,2同步。其配置為producer.type,如果為sync,則是同步傳送;如果為async,則是非同步傳送。

客戶端程式碼如下:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.*;
public class KafkaProducer {
    private Producer<String,String> inner;
    public KafkaProducer() throws Exception{
        Properties properties = new Properties();
        properties.load(ClassLoader.getSystemResourceAsStream("KafkaProducer.properties"));
/* KafkaProducer.properties內容如下
*metadata.broker.list=172.23.9.134:9092,172.23.9.135:9092,172.23.9.136:9092
* producer.type=sync
* compression.codec=0
* serializer.class=kafka.serializer.StringEncoder
* request.required.acks=1
*/
//根據KfkaProducer.properties載入配置資訊
        ProducerConfig config = new ProducerConfig(properties);
        inner = new Producer<String, String>(config);
    }
//一條一條傳送
    public void send(String topicName, String message) {
        if(topicName == null || message == null){
            return;
        }
        KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
        inner.send(km);
    }
//批量傳送
    public void send(String topicName, Collection<String> messages) {
        if(topicName == null || messages == null){
            return;
        }
        if(messages.isEmpty()){
            return;
        }
        List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
        for(String entry : messages){
            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
            kms.add(km);
        }
        inner.send(kms);
    }
    public void close(){
        inner.close();
    }
    public static void main(String[] args) {
        KafkaProducer producer = null;
        try{
            producer = new KafkaProducer();
            int i=0;
            while(true){
                StringBuffer sbMsg = new StringBuffer();
                sbMsg.append("content");
//以KV對的形式傳送
                producer.send("key", sbMsg.toString());
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(producer != null){
                producer.close();
            }
        }
    }
}

那麼接下去會發生什麼呢?最終會呼叫到kafka.javaapi.producer

package kafka.javaapi.producer

import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
import scala.collection.mutable
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V])
{  //最終實現生成者功能的是kafka.producer.Producer
  def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
  def send(message: KeyedMessage[K,V]) {
    underlying.send(message)
  }
  def send(messages: java.util.List[KeyedMessage[K,V]]) {
    import collection.JavaConversions._
    underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
  }
  def close = underlying.close
}

其具體的傳送邏輯如下: