1. 程式人生 > >kafka原始碼分析之producer

kafka原始碼分析之producer

Producerclient

示例程式碼

Properties props = new Properties();props.put("bootstrap.servers""localhost:9092");props.put("client.id""DemoProducer");props.put("key.serializer""org.apache.kafka.common.serialization.ByteArraySerializer");props.put("value.serializer""org.apache.kafka.common.serialization.

ByteArraySerializer");producer new KafkaProducer<IntegerString>(props);this.topic = topic;this.isAsync = isAsync;

String messageStr = "Message_";long startTime = System.currentTimeMillis();if (isAsync) {

非同步處理,這個過程需要定義一個回撥函式來監聽傳送的訊息的響應結果

// Send asynchronouslyproducer.send(new ProducerRecord<byte[]

byte[]>(topic,messageNo.getBytes()/*key*/,messageNo.getBytes()/*value*/),

      /*非同步處理,回撥函式*/

new DemoCallBack(startTimemessageNomessageStr));else 

同步處理,傳送完成後,等待發送的響應結果。

// Send synchronouslytry {producer.send(new ProducerRecord<IntegerString>(topic,messageNo.getBytes()/*key*/,messageNo.getBytes()/*value*/)).get()

;System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");catch (InterruptedException e) {    e.printStackTrace();catch (ExecutionException e) {    e.printStackTrace();}}

關於非同步處理的回撥函式定義:

這個回撥函式實現需要實現org.apache.kafka.clients.producer.Callback介面。

class DemoCallBack implements Callback 

並實現介面中的函式:

public void onCompletion(RecordMetadata metadataException exception) {

這裡的startTime是傳送這條訊息時,生成回撥函式時傳入的訊息傳送的開始時間,

計算出來了這次傳送這條訊息共花的時間long elapsedTime = System.currentTimeMillis() - startTime;if (metadata != null) {

如果metadata資訊不為空,表示訊息新增成功,可以得到當前新增成功的訊息的offset.    System.out.println("message(" key ", " message ") sent to partition(" 

+ metadata.partition() +"), " +"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");else {

這種情況下,表示exception有值,也就是新增訊息失敗了,可以直接列印這個失敗的訊息的內容。    exception.printStackTrace();}}

Client端的生成與處理流程

生成KafkaProducer例項

1,首先看看KafkaProducer例項生成:

根據傳入的properties配置資訊,生成用於Producer的config例項。

this(new ProducerConfig(properties)nullnull);

2,解析必要的配置項:

2,1,配置項client.id,用於標記client端的一個編碼值,預設值為producer-1。在同一個程序內,多個client端時,如果沒有指定,預設根據1這個值向後增加。

2,2,配置項partitioner.class,配置用於producer寫入資料時用於計算這條資料對應的partition的分配運算元例項,這個例項必須是的Partitioner實現。例項初始化時會呼叫configure函式把配置檔案傳入進去,用於例項生成時使用,預設情況下分割槽運算元是DefaultPartitioner。這個預設運算元根據當前的key值進行murmur2 hash並與對應的topic的個數於模,如果key為null時,根據一個自增的integer的值與partition的個數取模.

2,3,配置項retry.backoff.ms,用於在向broker傳送資料失敗後的重試間隔時間,預設值為100ms

2,4,配置項metadata.max.age.ms,用於配置每個producer端快取topic的metadata的過期時間,預設值為5分鐘。配置上面的2,3,與2,4的配置,生成一個Metadata例項。

2,5,配置項max.request.size,用於配置每次producer請求的最大的位元組數,預設值為1MB。

2,6,配置項buffer.memory,用於配置producer端等待向server傳送的資料的緩衝區的大小,預設值為32MB。

2,7,配置項compression.type,預設值none,用於配置資料的壓縮演算法,預設為不壓縮,可配置的值為none,gzip,snappy,lz4

2,8,配置項max.block.ms,用於配置send資料或partitionFor函式得到對應的leader時,最大的等待時間,預設值為60秒。

2,9,配置項request.timeout.ms,用於配置socket請求的最大超時時間,預設值為30秒。

3,生成record的累加器,這是一個用於對producer要傳送的資料進行緩衝的例項:

this.accumulator new RecordAccumulator(

        config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),this.totalMemorySize,this.compressionType,config.getLong(ProducerConfig.LINGER_MS_CONFIG),retryBackoffMs,metrics,time,metricTags);

3,1,RecordAccumulator例項需要的配置:

3,1,1配置項batch.size,用於批量提交的batch位元組大小,預設值為16384。

3,1,2配置項linger.ms,這個配置與3,1,1配合使用,用於配置資料快取的最大延遲時間,預設值0.

3,1,3依賴的其它配置項:2,6  2,7 2,3。

4,根據配置項bootstrap.servers,多個配置使用逗號分開,

生成用於socket請求的InetSocketAddress例項集合。

4,1並根據配置的broker的連線地址集合,生成Cluster的例項。把cluster例項更新到metadata的例項中。

5,生成NetworkClient例項,這個例項用於與各個broker進行socket通訊,生成用於進行資料傳送的Sender例項,並生成用於資料傳送的KafkaThread執行緒並啟動。

6,根據配置項key.serializer/value.serializer,生成key與value的序列化例項,這例項必須是Serializer的實現。

KafkaThread執行緒初始化

生成NetworkClient例項需要的配置項:

1,配置項connections.max.idle.ms,預設值為9分鐘,用於設定連線最大的空閒時間,

2,配置項max.in.flight.requests.per.connection,預設值5,用於設定每個連線最大的請求個數

3,配置項reconnect.backoff.ms,預設值50ms,用於設定重新嘗試連線的等待時間。

4,配置項send.buffer.bytes,預設值128kb,用於設定socket的傳送緩衝區SO_SNDBUF的大小。

5,配置項receive.buffer.bytes,預設值32kb,用於設定socket的接收響應的緩衝區SO_RCVBUF的大小。

6,配置項request.timeout.ms,用於配置socket請求的最大超時時間,預設值為30秒。

NetworkClient client = new NetworkClient(new Selector(

            config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)

this.metricstime"producer"metricTagschannelBuilder),this.metadata,clientId,config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),this.requestTimeoutMstime);

Sender是一個用於傳送資料的執行緒。

需要的配置項:

1,配置項max.request.size,用於配置每次producer請求的最大的位元組數,預設值為1MB。

2,配置項acks,預設值1,用於配置請求的ack的型別,-1,0,1三種。

3,配置項retries,預設值0,用於配置傳送失敗的重試次數。this.sender new Sender(client,this.metadata,this.accumulator,config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),(shortparseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),config.getInt(ProducerConfig.RETRIES_CONFIG),this.metrics,new SystemTime(),clientId,this.requestTimeoutMs);String ioThreadName = "kafka-producer-network-thread" 

+ (clientId.length() > " | " clientId "");

這裡用於啟動用於對producer中的資料進行傳送的執行緒Sender例項。this.ioThread new KafkaThread(ioThreadNamethis.sendertrue);this.ioThread.start();

通過producer傳送資料

Producersend函式

public Future<RecordMetadata> send(ProducerRecord<KV> record) {return send(recordnull);}

如果需要考慮資料傳送成功的回撥處理時,需要實現Callback。public Future<RecordMetadata> send(ProducerRecord<KV> record

Callback callback) {try {

這裡根據請求的記錄的topic的名稱,得到這個topic對應的metadata資訊,這裡通過Metadata例項來得到。函式返回值是讀取topic的metadata資訊的讀取時間。

1,從metadata例項中的topics集合中檢查這個topic是否存在,如果不存在,把這個topic新增到集合中,

2,從metadata對應的Cluster例項(這裡儲存有每個broker的連線資訊)中的partitionsByTopic集合中根據topic得到topic對應的partition資訊的集合,如果partitionsByTopic中已經存在有對應的partitions的記錄,說明這個topic的metadata資訊已經被加載出來,函式直接返回0。

3,如果當前的topic在metadata中沒有對應的partitions的資訊,根據max.block.ms配置的最大等待時間,通過每個broker的連線,隨機取出一個broker的連線,如果broker的連線不存在時,會建立這個連線並向broker發起一個TopicMetadataRequest請求得到這個topic對應的metadata資訊。// first make sure the metadata for the topic is availablelong waitedOnMetadataMs = waitOnMetadata(record.topic()

this.maxBlockTimeMs);

這裡得到總的等待時間除去得到metadata資訊用去的時間後還可以用於等待新增資料到傳送佇列處理的等待時間。long remainingWaitMs = Math.max(0this.maxBlockTimeMs 

               waitedOnMetadataMs);

對傳入的key與value進行序列化操作,並得到序列化後的byte array的key與value.byte[] serializedKey;try {            serializedKey = keySerializer.serialize(record.topic()record.key());catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " 

+ record.key().getClass().getName() +" to class " producerConfig.getClass(

                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer");}byte[] serializedValue;try {            serializedValue = valueSerializer.serialize(record.topic()

record.value());catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " 

                    record.value().getClass().getName() +" to class " producerConfig.getClass(

                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer");}

得到這條記錄對應的partition,並根據這個partition生成TopicPartition,

在得到對應的partition時,如果傳入引數中包含有partition的id時,判斷這個partition的值是否在指定的範圍內,必須在指定的範圍內,如果partition沒有傳入時,通過指定的partitioner的例項,根據record的kv資訊,生成一個partition的id值。int partition = partition(recordserializedKeyserializedValue

metadata.fetch());

得到一條記錄的長度,這個記錄的長度為size(4),offset(8),crc(4),magic(1),attr(1),

            Keysize(4),key,valuesize(4),valueint serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey

serializedValue);ensureValidRecordSize(serializedSize);TopicPartition tp = new TopicPartition(record.topic()partition);

log.trace("Sending record {} with callback {} to topic {} partition {}",

recordcallbackrecord.topic()partition);

向client端的訊息緩衝區內寫入這條訊息。RecordAccumulator.RecordAppendResult result = accumulator.append(tp

serializedKeyserializedValuecallbackremainingWaitMs);

if (result.batchIsFull || result.newBatchCreated) {

如果當前的緩衝區的batch的大小已經滿了,或者說這個緩衝區中重新生成了一個batch時,喚醒sender的執行緒,讓sender的run函式繼續執行,完成對資料的傳送操作。log.trace("Waking up the sender since topic {} partition {} is either full 

                  or getting a new batch"record.topic()partition);

this.sender.wakeup();}return result.future;// handling exceptions and record the errors;        // for API exceptions return them in the future,        // for other exceptions throw directlycatch (ApiException e) {log.debug("Exception occurred during message send:"e);if (callback != null)            callback.onCompletion(nulle);this.errors.record();return new FutureFailure(e);catch (InterruptedException e) {this.errors.record();throw new InterruptException(e);catch (BufferExhaustedException e) {this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();throw e;catch (KafkaException e) {this