Kafka: 0.10 Producer 新增timestamp 以及使用配置
轉自:https://segmentfault.com/a/1190000008674900
本文目錄結構:
-
Producer API入門
-
非同步傳送流程
-
Producer設計說明
-
Producer Configuration
1. Producer API入門:
KafkaProducer是一個傳送record到Kafka Cluster的客戶端API。這個類執行緒安全的。在應用程式中,通常的作法是:所有發往一個Kafka Cluster的執行緒使用同一個Producer物件.。如果你的程式要給多個Cluster傳送訊息,則需要使用多個Producer。
ProducerRecord說明
從上面程式碼裡可以看出,代表要傳送的訊息記錄類是ProducerRecord:
一條record通常包括5個欄位:
·topic:指定該record發往哪個topic下。[Required]
·partition:指定該record發到哪個partition中。[Optional]
·key:一個key。[Optional]
·value:記錄人內容。[Required]
·timestamp:時間戳。[Optional]
預設情況下:
如果使用者指定了partition,那麼就發往使用者指定的partition。如果使用者沒有指定partition,那麼就會根據key來決定放到哪個partition,如果key也沒有指定,則由producer隨機選取一個partition。
在Producer端,如果使用者指定了timestamp,則record使用使用者指定的時間,如果使用者沒有指定,則會使用producer端的當前時間。在broker端,如果配置了時間戳採用createtime方式,則使用producer傳給Broker的record中的timestramp時間,如果指定為logappendtime,則在broker寫入到Log檔案時會重寫該時間。
2. 非同步傳送流程
2.1、使用者執行緒呼叫send方法將record放到BufferPool中
可能在之前的kafka-client版本中,還支援同步方式傳送訊息記錄。不過在我看的版本(0.10.0.0)中,已經不再支援同步方式傳送了。當用戶使用KafkaProducer#send()傳送record時,執行流程是:
1、由interceptor chain對ProducerRecord做傳送前的處理
攔截器介面是:ProducerInterceport,使用者可以自定義自己的攔截器實現。
該攔截器鏈,在Producer物件初始化時初始化,之後不會再變了。所以呢,攔截器鏈中的攔截器都是公用的,如果要自定義攔截器的話,這個是需要注意的。
ProducerInterceptor有兩個方法:
·onSend: KafkaProducer#send 呼叫時就會執行此方法。
·onAcknowledgement:傳送失敗,或者傳送成功(broker 通知producer代表傳送成功)時都會呼叫該方法。
此階段執行的就是onSend方法。
2、阻塞方式獲取到broker cluster 上broker cluster的資訊
採用RPC方式獲取到的broker資訊,由一個MetaData類封裝。它包括了broker cluster的必要資訊,譬如有:所有的broker資訊(idhostport等)、所有的topic名稱、每一個topic對於的partition情況(id、leader node、replica nodes、ISR nodes等)。
雖然該過程是阻塞的,但並不是每傳送一個record都會通過RPC方式來獲取的。Metadata會在Producer端快取,只有在record中指定的topic不存在時、或者MetaData輪詢週期到時才會執行。
3、對record中key、value進行序列化
這個沒有什麼可說的。內建了基於String、Integer、Long、Double、Bytes、ByteBuffer、ByteArray的序列化工具。
4、為record設定partition屬性
前面說過,建立ProducerRecord時,partition是Optional的。所以如果使用者建立record時,沒有指定partition屬性。則由partition計算工具(Partitioner 介面)來計算出partition。這個計算方式可以自定義。Kafka Producer 提供了內建的實現:
·如果提供了Key值,會根據key序列化後的位元組陣列的hashcode進行取模運算。
·如果沒有提供key,則採用迭代方式(其實取到的值並非完美的迭代,而是類似於隨機數)。
5、校驗record的長度是否超出閾值
MAX_REQUEST_SIZE_CONFIG=”max.request.size”
BUFFER_MEMORY_CONFIG=”buffer.memory”
超出任何一項就會丟擲異常。
6、為record設定timestamp
如果使用者建立ProducerRecord時沒有指定timestamp,此處為止設定為producer的當前時間。
其實在java client中,設計了一個Time介面,專門用於設定這個時間的。內建了一個實現SystemTime,這裡將record timestamp設定為當前時間,就是由SystemTime來完成的。所以如果希望在kafka producer java client中使用其它的時間,可以自定義Time的實現。
7、將該record壓縮後放到BufferPool中
這一步是由RecordAccumulator來完成的。RecordAccumulator中為每一個topic維護了一個雙端佇列Deque<RecordBatch>,佇列中的元素是RecordBatch(RecordBatch則由多個record壓縮而成)。RecordAccumulator要做的就是將record壓縮後放到與之topic關聯的那個Deque的最後面。
關於record的壓縮方式,kafka producer在支援了幾種方式:
·NONE:就是不壓縮。
·GZIP:壓縮率為50%
·SNAPPY:壓縮率為50%
·LZ4:壓縮率為50%
在將record放到Deque中最後一個RecordBatch中的過程如下:如果最後一個recordbatch可以放的下就放,放不下就新建一個RecordBatch。
RecordBatch實際上是儲存於BufferPool中,所以這個過程實際上是把record放在BufferPool中。在建立BufferPool之初,會指定BufferPool的總大小,BufferPool中每一個RecordBatch的大小等等配置。
8、喚醒傳送模組
執行到上一步時,KafkaProducer#sender的處理基本算是完畢。這個一步的目的就是喚醒NIO Selector。
此外,在上述步驟2~8,不論哪一步出現問題,都會丟擲異常。而丟擲異常時,就會被KafkaProducer捕獲到,然後交由Sensor(感測器)進行處理。而Sensor通常會呼叫第1步中提到的interceptor chain 執行onAcknowledgement告知使用者。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
/**
* Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);
throw e;
}
}
2.2、傳送排程
KafkaProducer#sender只是將record放到BufferPool中,並沒有將record發出去,而傳送排程,則是由另外一個執行緒(Sender)來完成的。
Sender的執行過程如下:
1、 取出就緒的record
這一步是檢查要傳送的record是否就緒:根據KafkaProducer維護的Metadata檢查要每一個record要發往的Leader node是否存在。如果有不存在的,就設定為需要更新,並且這樣的record認為還未就緒。以保證可以發到相關partition的leader node。
2、 取出RecordBatch,並過濾掉過期的RecordBatch
對於過期的RecordBatch,會通過Sensor通知Interceptor傳送失敗。
3、為要傳送的RecordBatch建立請求
一個RecordBatch一個ClientRequest。
4、保留請求併發送
把請求物件保留到一個inFlightRequest 集合中。這個集合中存放的是正在傳送的請求,是一個topic到Deque的Map。當傳送成功,或者失敗都會移除。
5、處理髮送結果
如果傳送失敗,會嘗試retry。並由Sensor排程Interceptor。
如果傳送成功,會由Sensor排程Interceptor。
3. Producer實現說明
從上述處理流程中,可以看到在java client中的一些設計:
1、Interceptor Chain:可以做為用於自定義外掛的介面。
2、MetaData:producer 不按需以及定期的傳送請求獲取最新的Cluster狀態資訊。Producer根據這個資訊可以直接將record batch傳送到相關partition的Leader中。也就是在客戶端完成Load balance。
3、Partitioner:分割槽選擇工具,選擇傳送到哪些分割槽,結合Metadata,完成Load balance。
4、RecordBatch:在客戶端對record壓縮排RecordBatch,然後一個RecordBatch發一次。這樣可以減少IO操作的次數,提高效能。
5、非同步方式傳送:提高使用者應用效能。
4. Producer Configuration
在文章開始的地方說明了,使用Kafka Producer Java Client時,只需要建立一個KafkaProducer就可以了。而它在執行過程中,會使用到很多配置項,這些配置項都是在KafkaProducer初始化時完成的。
下面就來看看java client中要求的配置項:
·bootstrap.servers
用於配置cluster中borker的host/port對。可以配置一項或者多項,不需要將cluster中所有例項都配置上。因為它後自動發現所有的broker。
如果要配置多項,格式是:host1:port1,host2:port2,host3:port3….
·key.serializer、value.serializer
配置序列化類名。指定 的這些類都要實現Serializer介面。
·acks
為了確保message record被broker成功接收。Kafka Producer會要求Borker確認請求(傳送RecordBatch的請求)完成情況。
對於message接收情況的確認,Kafka Broker支援了三種情形:
1、不需要確認;
2)leader接收到就確認;
3)等所有可用的follower複製完畢進行確認。
可以看出,這三種情況代表不同的確認粒度。在Java Producer Client中,對三種情形都做了支援,上述三種情形分別對應了三個配置項:0、1、-1。其實還有一個值是all,它其實就是-1。
Kafka Producer Java Client 是如何支援這三種確認呢?
1、 在為RecordBatch建立請求時,acks的值會被封裝為請求頭的一部分。
2、 傳送請求後(接收到Broker響應前),立即判斷是否需要確認該請求是否完成(即該RecordBatch是否被Broker成功接收),判斷依據是acks的值是否是0。如果是0,即不需要進行確認。那麼就認定該請求成功完成。既然認定是成功,那麼就不會進行retry了。
如果值不是0,就要等待Broker的響應了。根據響應情況,來判斷請求是否成功完成。
該配置項預設值是1,即leader接收後就響應。
·buffer.memory
BufferPool Size,也就是等待發送的Record的空間大小。預設值是:33554432,即32MB。
配置項的單位是byte,範圍是:[0,….]
·compression.type
Kafka提供了多種壓縮型別,可選值有4個: none, gzip, snappy, lz4。預設值是none。
·retries
當一個RecordBatch傳送失敗時,就會重新改善以確保資料完成交付。該配置設定了重試次數,值範圍[0, Integer.Max]。如果是0,即便失敗,也不會進行重發。
如果允許重試(即retries>0),但max.in.flight.requests.per.connection 沒有設定成1。這種情況下,就可能會出現records的順序改變的現象。例如:一個prodcuder client的sender執行緒在一次輪詢中,如果有兩個recordbatch都要傳送到同步一個partition中,此時它們肯定是發往同一個broker的,並且是用的同一個TCP connection。如果出現RecordBatch1先發,但是傳送失敗,RecordBatch2緊接著RecordBatch1傳送,它是傳送成功的。然後RecordBatch1會進行重發。這樣一來,就出現了broker接收到的順序是RecordBatch2先於RecordBatch1的情況。
·ssl.key.password
Keystore 檔案中私鑰的密碼。可選的。
·ssl.keystore.location
Keystore檔案的位置。可選的。
·ssl.keystore.password
Keystore 檔案的密碼。可選的。
·ssl.truststore.location
Trust store 檔案的位置。可選的。
·ssl.truststore.password
Trust store檔案的密碼。可選的。
·batch.size
RecordBatch的最大容量。預設值是16384(16KB)。
·client.id
邏輯名,client給broker發請求是會用到。預設值是:””。
·connections.max.idle.ms
Connection的最大空閒時間。預設值是540000 (9 min)
·linger.ms
Socket :solinger。延遲。預設值:0,即不延遲。
·max.block.ms
當需要的metadata未到達之前(例如要傳送的record的topic,在Client中還沒有相關記錄時),執行KafkaProducer#send時,內部處理會等待MetaData的到達。這是個阻塞的操作。為了防止無限等待,設定這個阻塞時間是必要的。範圍:[0, Long.MAX]
·max.request.size
最大請求長度,在將record壓縮到RecordBatch之前會進行校驗。超過這個大小會丟擲異常。
·partitioner.class
用於自定義partitioner演算法。預設值是:
org.apache.kafka.clients.producer.internals.DefaultPartitioner
·receive.buffer.byte
TCP receiver buffer的大小。取值範圍:[-1, …]。這個配置項的預設值是32768(即 32KB)。
如果設定為-1,則會採用作業系統的預設值。
·request.timeout.ms
最大請求時長。因為發起請求後,會等待broker的響應,如果超過這個時間就認為請求失敗。
·timeout.ms
這個時間配置的是follower到leader的ack超時時間。這個時間和 producer傳送的請求的網路無關。
·block.on.buffer.full
當bufferPool用完後,如果client還在使用KafkaProducer傳送record,要麼是BufferPool拒絕接收,要麼是丟擲異常。
這個配置是預設值是flase,也就是當bufferpool滿時,不會丟擲BufferExhaustException,而是根據max.block.ms進行阻塞,如果超時丟擲TimeoutExcpetion。
如果這個屬性值是true,則會把max.block.ms值設定為Long.MAX。另外該配置為true時,metadata.fetch.time.ms將不會生效了。
·interceptor.class
自定義攔截器類。預設情況下沒有指定任何的interceptor。
·max.in.flight.requests.per.connection
每個連線中處於傳送狀態的請求數的最大值。預設值是5。範圍是[1, Integer.MAX]
·metric.reporters
MetricReporter的實現類。預設情況下,會自動的註冊JmxReporter。
·metrics.num.samples
計算metric時的取樣數。預設值是2。範圍:[1,Integer.MAX]
·metrics.sample.window.ms
取樣的時間視窗。預設值是30000(30s)。範圍:[0, Long.MAX]