1. 程式人生 > >kafka 客戶端 producer 配置參數

kafka 客戶端 producer 配置參數

導致 丟失 max 字符串 大於 重傳 完全 code epo

屬性描述類型默認值
bootstrap.servers 用於建立與kafka集群的連接,這個list僅僅影響用於初始化的hosts,來發現全部的servers。
格式:host1:port1,host2:port2,…,數量盡量不止一個,以防其中一個down了
list
acks Server完成 producer request 前需要確認的數量。
acks=0時,producer不會等待確認,直接添加到socket等待發送;
acks=1時,等待leader寫到local log就行;
acks=allacks=-1時,等待isr中所有副本確認
(註意:確認都是 broker 接收到消息放入內存就直接返回確認,不是需要等待數據寫入磁盤後才返回確認,這也是kafka快的原因)
string 1
buffer.memory Producer可以用來緩存數據的內存大小。該值實際為RecordAccumulator類中的BufferPool,即Producer所管理的最大內存。
如果數據產生速度大於向broker發送的速度,producer會阻塞max.block.ms,超時則拋出異常
long 33554432
compression.type Producer用於壓縮數據的壓縮類型,取值:none, gzip, snappy, or lz4
string none
batch.size Producer可以將發往同一個Partition的數據做成一個Produce Request發送請求,即Batch批處理,以減少請求次數,該值即為每次批處理的大小。
另外每個Request請求包含多個Batch,每個Batch對應一個Partition,且一個Request發送的目的Broker均為這些partition的leader副本。
若將該值設為0,則不會進行批處理
int 16384
linger.ms Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然後再發送,以此提高吞吐量,而linger.ms則更進一步,這個參數為每次發送增加一些delay,以此來聚合更多的Message。
官網解釋翻譯:producer會將request傳輸之間到達的所有records聚合到一個批請求。通常這個值發生在欠負載情況下,record到達速度快於發送。但是在某些場景下,client即使在正常負載下也期望減少請求數量。這個設置就是如此,通過人工添加少量時延,而不是立馬發送一個record,producer會等待所給的時延,以讓其他records發送出去,這樣就會被聚合在一起。這個類似於TCP的Nagle算法。該設置給了batch的時延上限:當我們獲得一個partition的batch.size大小的records,就會立即發送出去,而不管該設置;但是如果對於這個partition沒有累積到足夠的record,會linger指定的時間等待更多的records出現。該設置的默認值為0(無時延)。例如,設置linger.ms=5,會減少request發送的數量,但是在無負載下會增加5ms的發送時延。
long 0
max.request.size 請求的最大字節數。這也是對最大消息大小的有效限制。註意:server具有自己對消息大小的限制,這些大小和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。 int 1048576
receive.buffer.bytes TCP的接收緩存 SO_RCVBUF 空間大小,用於讀取數據 int 32768
request.timeout.ms client等待請求響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求,超過重試次數發送失敗 int 30000
send.buffer.bytes TCP的發送緩存 SO_SNDBUF 空間大小,用於發送數據 int 131072
timeout.ms 指定server等待來自followers的確認的最大時間,根據acks的設置,超時則返回error int 30000
max.in.flight.requests.per.connection 在block前一個connection上允許最大未確認的requests數量。
當設為1時,即是消息保證有序模式,註意:這裏的消息保證有序是指對於單個Partition的消息有順序,因此若要保證全局消息有序,可以只使用一個Partition,當然也會降低性能
int 5
metadata.fetch.timeout.ms 在第一次將數據發送到某topic時,需先fetch該topic的metadata,得知哪些服務器持有該topic的partition,該值為最長獲取metadata時間 long 60000
reconnect.backoff.ms 連接失敗時,當我們重新連接時的等待時間 long 50
retry.backoff.ms 在重試發送失敗的request前的等待時間,防止若目的Broker完全掛掉的情況下Producer一直陷入死循環發送,折中的方法 long 100

其余參數(註:以下均為默認值)


# metrics系統維護可配置的樣本數量,在一個可修正的window size
metrics.sample.window.ms=30000

# 用於維護metrics的樣本數
metrics.num.samples=2

# 類的列表,用於衡量指標。實現MetricReporter接口
metric.reporters=[]

# 強制刷新metadata的周期,即使leader沒有變化
metadata.max.age.ms=300000

# 與broker會話協議,取值:LAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol=PLAINTEXT

# 分區類,實現Partitioner接口
partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner

# 控制block的時長,當buffer空間不夠或者metadata丟失時產生block
max.block.ms=60000

# 關閉達到該時間的空閑連接
connections.max.idle.ms=540000

# 當向server發出請求時,這個字符串會發送給server,目的是能夠追蹤請求源
client.id=""

# 發生錯誤時,重傳次數。當開啟重傳時,需要將`max.in.flight.requests.per.connection`設置為1,否則可能導致失序
retries=0

# key 序列化方式,類型為class,需實現Serializer interface
key.serializer=

# value 序列化方式,類型為class,需實現Serializer interface
value.serializer=

kafka 客戶端 producer 配置參數