1. 程式人生 > >KAFKA 1.0 文件(五):生產者配置

KAFKA 1.0 文件(五):生產者配置

從0.11.0.0開始生產者配置block.on.buffer.full,metadata.fetch.timeout.ms和timeout.ms已被刪除。
以下是Java生產者的配置:

name description default import
bootstrap.servers 用於建立與kafka叢集連線的host/port組。
資料將會在所有servers上均衡載入,不管哪些server是指定用於bootstrapping。
這個列表僅僅影響初始化的hosts(用於發現全部的servers)
這個列表格式:host1:port1,host2:port2,…
key.serializer 關鍵字的序列化類。如果沒給與這項,預設情況是和訊息一致
value.serializer 值的序列化類
acks 此配置是表明當一次傳送訊息請求被認為完成時的確認值,就是指procuder需要多少個broker返回的確認訊號。此配置實際上代表了資料備份的可用性。以下設定為常用選項:
(1)acks=0: 設定為0表示producer不需要等待任何確認收到的資訊。副本將立即加到socket buffer並認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收資料,同時重試配置不會發生作用(因為客戶端不知道是否失敗)回饋的offset會總是設定為-1;
(2)acks=1: 這意味著至少要等待leader已經成功將資料寫入本地log,但是並沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份資料,而此時leader又掛掉,則訊息會丟失。
(3)acks=all: 這意味著leader需要等待所有備份都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失資料。這是最強的保證。
(4)其他的設定,例如acks=2也是可以的,這將需要給定的acks數量,但是這種策略一般很少用。
1
buffer.memory 生產者可用於緩衝等待發送到伺服器的記錄的總位元組數。 如果記錄的傳送速度比傳送到伺服器的速度快,那麼生產者將會阻塞max.block.ms,之後它將丟擲一個異常。
這項設定將和producer能夠使用的總記憶體相關,但並不是一個硬性的限制,因為不是producer使用的所有記憶體都是用於快取。一些額外的記憶體會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。
33554432
compression.type producer用於壓縮資料的壓縮型別。預設是無壓縮。正確的選項值是none、gzip、snappy和lz4。壓縮最好用於批量處理,批量處理訊息越多,壓縮效能越好。推薦配置一種適合的壓縮演算法,可以大幅度的減緩網路壓力和Broker的儲存壓力。 string none high
retries 如果設定de值大於0,客戶端將重新發送之前傳送失敗的資料。注意,這些重試與客戶端接收到傳送錯誤時的重試沒有什麼不同。在不將max.in.flight.requests.per.connection設定為1的情況下允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是傳送到同一個partition,則第一個訊息失敗第二個傳送成功,則第二條訊息會比第一條訊息出現要早。 0 high
ssl.key.password 祕鑰儲存檔案中的私有密碼,可選項 null
ssl.keystore.location 金鑰儲存檔案的位置。這個可選的配置可用於為客戶端提供雙向認證。 null
ssl.keystore.password 金鑰儲存檔案的儲存密碼,和ssl.keystore.location配合使用 null
ssl.truststore.location 信任檔案的儲存位置 null
ssl.truststore.password 信任檔案的密碼 null
batch.size 只要有多個記錄被髮送到同一個分割槽,生產者就會嘗試將記錄組合成一個batch的請求。 這有助於客戶端和伺服器的效能。 該配置以位元組為單位控制預設的批量大小。不要嘗試大於次值的批量記錄。傳送給broker的請求將包含多個批次。太小的批量會降低吞吐量(批量為0講完全禁用批處理),太大的批量會浪費一些記憶體,因為這批量的大小是預分配的。 16384 medium
client.id 當向server發出請求時,這個字串會發送給server。 使用者隨意指定,但是不能重複,主要用於跟蹤記錄訊息 “”
connections.max.idle.ms 空閒連線的超時關閉時間 540000
linger.ms Producer預設會把兩次傳送時間間隔內收集到的所有請求進行一次彙總然後再發送,以此提高吞吐量,而linger.ms則更進一步,這個引數為每次傳送增加一些延遲,以此來聚合更多的Message。通常來說,這隻有在記錄產生速度大於傳送速度的時候才能發生。一旦彙集的訊息資料達到了某個partition的batch.size,他將會立即傳送而不顧這項設定,然而如果我們獲得訊息位元組數比batch.size設定要小的多,我們需要“linger”特定的時間以獲取更多的訊息。 這個設定預設為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。 0
max.block.ms 該配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()將被阻塞多長時間。這些方法可能因為緩衝區已滿或元資料不可用而被阻塞。使用者提供的序列化程式或分割槽程式中的阻塞將不計入此超時。 60000
max.request.size 請求的最大位元組數。這也是對最大記錄數的有效覆蓋。注意:server具有自己的記錄量,或許和這個不同。此項設定將會限制producer每次批量傳送請求的數目,以防發出超量的請求。 1048576
partitioner.class 指定計算分割槽的類,實現org.apache.kafka.clients.producer.Partitioner介面
receive.buffer.bytes TCP 接受快取大小,當讀取資料時使用
request.timeout.ms 生產者傳送資料到broker,等待響應的時間。如果超時,則表示此次請求失敗,必要時將重新發送請求。這個值應該比replica.lag.time.max.ms(broker配置)大,以減少由於不必要的生產者重試造成的訊息重複的可能性。 30000
send.buffer.bytes TCP傳送快取大小,當傳送資料時使用 131072
enable.idempotence 如果為true,producter保證每個訊息的副本正好一次被寫入流中,如果為false,producter會在broker發生錯誤時重試,可能在重試過程中同一訊息傳送了多次。預設為false。請注意,啟用冪等性需要將max.in.flight.requests.per.connection設定為1,並且重試次數不能為零。另外acks必須設定為’all’。 如果這些值保留預設值,我們將覆蓋預設值為合適的值。 如果這些值被設定為與冪等生產者不相容的東西,將會丟擲一個ConfigException異常。 false low
max.in.flight.requests.
per.connection
在阻塞之前,客戶端在單個連線上傳送的未確認請求的最大數量。
請注意,如果此設定設定為大於1,並且傳送失敗,則由於重試(即,如果重試被啟用),存在重新排序訊息的風險。(i.e., if retries are enabled).
5 low
interceptor.classes 用作攔截器的類的列表。 通過實現ProducerInterceptor介面,您可以在生產者釋出到Kafka叢集之前攔截(甚至可能會改變)生產者接收到的記錄。 預設情況下,沒有攔截器 null low
metadata.max.age.ms 即使沒有發現任何分割槽的leadership有變化任然在設定的時間內強制重新整理元資料以主動發現新的broker或分割槽。單位毫秒 300000 low
metric.reporters 使用者衡量指標的一組類的集合,實現org.apache.kafka.common.metrics.MetricsReporter介面,允許以外掛方式加入類中,並將通知新的指標建立,總會包含一個JmxReporter用於註冊JMX的統計 “” low
metrics.num.samples 用於維護metrics的樣本數 2 low
metrics.recording.level metrics的最高記錄級別。 INFO low
metrics.sample.window.ms 計算指標樣本的時間視窗。 30000 low
reconnect.backoff.max.ms 1000 low
reconnect.backoff.ms 嘗試重新連線到給定主機之前等待的基本時間。 這避免了在一個緊密的迴圈中重複連線到主機。 該退避適用於客戶端向經紀人的所有連線嘗試。 50 low
retry.backoff.ms 嘗試重試對給定主題分割槽的失敗請求之前等待的時間量。 這樣可以避免在某些故障情況下重複傳送請求。 100 low
transaction.timeout.ms 主動中止正在進行的事務之前,事務協調器將等待來自生產者的事務狀態更新的最長時間(以毫秒為單位)。如果此值大於代理中的max.transaction.timeout.ms設定,則請求將 失敗並出現InvalidTransactionTimeout錯誤。 60000 low
transactional.id 用於事務傳輸的TransactionalId。 這使跨越多個生產者會話的可靠性語義成為可能,因為它允許客戶保證在開始任何新的事務之前使用相同的TransactionalId的事務已經完成。 如果沒有提供TransactionalId,則生產者被限制為冪等傳送。 請注意,如果配置了TransactionalId,則必須啟用enable.idempotence。 預設值為空,這意味著事務無法使用。 null low
sasl.kerberos.service.name Kafka執行的Kerberos主體名稱。 這可以在Kafka的JAAS配置或Kafka的配置中定義。 null medium
sasl.jaas.config 用於JAAS配置檔案使用的格式的SASL連線的JAAS登入上下文引數。 值的格式是:’(=)*;’ null medium
sasl.mechanism 用於客戶端連線的SASL機制。 這可能是安全提供者可用的任何機制。 GSSAPI是預設的機制。 GSSAPI medium
security.protocol 用於與broker溝通的協議。 有效值為:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 PLAINTEXT medium
ssl.enabled.protocols 對SSL連線啟用的協議列表 [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.type 祕鑰儲存檔案的格式。可選 JKS
ssl.protocol 用於生成SSLContext的SLL協議,預設的TLS使用於大多數情況。在最新的JVM中允許的值有TLS, TLSv1.1 和TLSv1.2。或許在老的JVM中也支援SSL, SSLv2 和SSLv3,但是因為有已知的安全漏洞不鼓勵使用。 TLS
ssl.provider 用於SSL連線的安全提供商的名稱。預設是JVM的預設安全提供商。 null
ssl.truststore.type truststore檔案的格式。 JKS
sasl.kerberos.kinit.cmd Kerberos kinit command path. /usr/bin/kinit low
sasl.kerberos.min.
time.before.relogin
Login thread sleep time between refresh attempts. 60000 low
sasl.kerberos.ticket.renew.jitter Percentage of random jitter added to the renewal time. 0.05 low
sasl.kerberos.ticket.
renew.window.factor
Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket. 0.8 low
ssl.cipher.suites A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. null low
ssl.endpoint.
identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate. null low
ssl.keymanager.algorithm The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. SunX509 low
ssl.secure.random.
implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations. null low
ssl.trustmanager.algorithm The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. PKIX low