kafka 客戶端 producer 配置參數
屬性 | 描述 | 類型 | 默認值 |
---|---|---|---|
bootstrap.servers | 用於建立與kafka集群的連接,這個list僅僅影響用於初始化的hosts,來發現全部的servers。 格式:host1:port1,host2:port2,…,數量盡量不止一個,以防其中一個down了 |
list | |
acks | Server完成 producer request 前需要確認的數量。acks=0 時,producer不會等待確認,直接添加到socket等待發送;acks=1 時,等待leader寫到local log就行;acks=all 或acks=-1 時,等待isr中所有副本確認 |
string | 1 |
buffer.memory | Producer可以用來緩存數據的內存大小。該值實際為RecordAccumulator類中的BufferPool,即Producer所管理的最大內存。 如果數據產生速度大於向broker發送的速度,producer會阻塞 max.block.ms ,超時則拋出異常 |
long | 33554432 |
compression.type | Producer用於壓縮數據的壓縮類型,取值:none, gzip, snappy, or lz4 |
string | none |
batch.size | Producer可以將發往同一個Partition的數據做成一個Produce Request發送請求,即Batch批處理,以減少請求次數,該值即為每次批處理的大小。 另外每個Request請求包含多個Batch,每個Batch對應一個Partition,且一個Request發送的目的Broker均為這些partition的leader副本。 若將該值設為0,則不會進行批處理 |
int | 16384 |
linger.ms | Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然後再發送,以此提高吞吐量,而linger.ms則更進一步,這個參數為每次發送增加一些delay,以此來聚合更多的Message。batch.size 大小的records,就會立即發送出去,而不管該設置;但是如果對於這個partition沒有累積到足夠的record,會linger 指定的時間等待更多的records出現。該設置的默認值為0(無時延)。例如,設置linger.ms=5 ,會減少request發送的數量,但是在無負載下會增加5ms的發送時延。 |
long | 0 |
max.request.size | 請求的最大字節數。這也是對最大消息大小的有效限制。註意:server具有自己對消息大小的限制,這些大小和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。 | int | 1048576 |
receive.buffer.bytes | TCP的接收緩存 SO_RCVBUF 空間大小,用於讀取數據 | int | 32768 |
request.timeout.ms | client等待請求響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求,超過重試次數發送失敗 | int | 30000 |
send.buffer.bytes | TCP的發送緩存 SO_SNDBUF 空間大小,用於發送數據 | int | 131072 |
timeout.ms | 指定server等待來自followers的確認的最大時間,根據acks 的設置,超時則返回error |
int | 30000 |
max.in.flight.requests.per.connection | 在block前一個connection上允許最大未確認的requests數量。 當設為1時,即是消息保證有序模式,註意:這裏的消息保證有序是指對於單個Partition的消息有順序,因此若要保證全局消息有序,可以只使用一個Partition,當然也會降低性能 |
int | 5 |
metadata.fetch.timeout.ms | 在第一次將數據發送到某topic時,需先fetch該topic的metadata,得知哪些服務器持有該topic的partition,該值為最長獲取metadata時間 | long | 60000 |
reconnect.backoff.ms | 連接失敗時,當我們重新連接時的等待時間 | long | 50 |
retry.backoff.ms | 在重試發送失敗的request前的等待時間,防止若目的Broker完全掛掉的情況下Producer一直陷入死循環發送,折中的方法 | long | 100 |
其余參數(註:以下均為默認值)
# metrics系統維護可配置的樣本數量,在一個可修正的window size
metrics.sample.window.ms=30000
# 用於維護metrics的樣本數
metrics.num.samples=2
# 類的列表,用於衡量指標。實現MetricReporter接口
metric.reporters=[]
# 強制刷新metadata的周期,即使leader沒有變化
metadata.max.age.ms=300000
# 與broker會話協議,取值:LAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol=PLAINTEXT
# 分區類,實現Partitioner接口
partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner
# 控制block的時長,當buffer空間不夠或者metadata丟失時產生block
max.block.ms=60000
# 關閉達到該時間的空閑連接
connections.max.idle.ms=540000
# 當向server發出請求時,這個字符串會發送給server,目的是能夠追蹤請求源
client.id=""
# 發生錯誤時,重傳次數。當開啟重傳時,需要將`max.in.flight.requests.per.connection`設置為1,否則可能導致失序
retries=0
# key 序列化方式,類型為class,需實現Serializer interface
key.serializer=
# value 序列化方式,類型為class,需實現Serializer interface
value.serializer=
kafka 客戶端 producer 配置參數