1. 程式人生 > >Kafka生產者屬性配置

Kafka生產者屬性配置

多樣的使用場景意味著多樣的需求:是否每個訊息都很重要?是否允許丟失一小部分訊息?偶爾出現重複訊息是否可以接受?是否有嚴格的延遲和吞吐量要求?

不同的場景對生產者API的使用和配置會有直接影響。

必選屬性有3個:

bootstrap.servers:該屬性指定broker的地址清單,地址的格式為host:port。清單裡不需要包含所有的broker地址,生產者會從給定的broker裡查詢其他broker的資訊。不過最少提供2個broker的資訊,一旦其中一個宕機,生產者仍能連線到叢集上。

key.serializer:生產者介面允許使用引數化型別,可以把Java物件作為鍵和值傳broker,但是broker希望收到的訊息的鍵和值都是位元組陣列,所以,必須提供將物件序列化成位元組陣列的序列化器。key.serializer必須設定為實現org.apache.kafka.common.serialization.Serializer的介面類,預設為

org.apache.kafka.common.serialization.StringSerializer,也可以實現自定義的序列化器。

value.serializer:同上。

可選引數:

acks:指定了必須要有多少個分割槽副本收到訊息,生產者才會認為寫入訊息是成功的,這個引數對訊息丟失的可能性有重大影響。

acks=0:生產者在寫入訊息之前不會等待任何來自伺服器的響應,容易丟訊息,但是吞吐量高。

acks=1:只要叢集的首領節點收到訊息,生產者會收到來自伺服器的成功響應。如果訊息無法到達首領節點(比如首領節點崩潰,新首領沒有選舉出來),生產者會收到一個錯誤響應,為了避免資料丟失,生產者會重發訊息。不過,如果一個沒有收到訊息的節點成為新首領,訊息還是會丟失。預設使用這個配置。

acks=all:只有當所有參與複製的節點都收到訊息,生產者才會收到一個來自伺服器的成功響應。延遲高。

buffer.memory:設定生產者記憶體緩衝區的大小,生產者用它緩衝要傳送到伺服器的訊息。

max.block.ms:指定了在呼叫send()方法或者使用partitionsFor()方法獲取元資料時生產者的阻塞時間。當生產者的傳送緩衝區已滿,或者沒有可用的元資料時,這些方法就會阻塞。在阻塞時間達到max.block.ms時,生產者會丟擲超時異常。

batch.size:當多個訊息被髮送同一個分割槽時,生產者會把它們放在同一個批次裡。該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算。當批次記憶體被填滿後,批次裡的所有訊息會被髮送出去。

retries:指定生產者可以重發訊息的次數。

receive.buffer.bytes和send.buffer.bytes:指定TCP socket接受和傳送資料包的快取區大小。如果它們被設定為-1,則使用作業系統的預設值。如果生產者或消費者處在不同的資料中心,那麼可以適當增大這些值,因為跨資料中心的網路一般都有比較高的延遲和比較低的頻寬。

linger.ms:指定了生產者在傳送批次前等待更多訊息加入批次的時間。