kafka文件(4)---- 0.8.2-Configuration-配置選項翻譯
來源:http://kafka.apache.org/documentation.html#configuration
3. Configuration
Kafka在配置檔案中使用key-value方式進行屬性配置。這些values可以通過檔案或者程式設計方式提供。
3.1 Broker Configs
基本配置如下:
-broker.id
-log.dirs
-zookeeper.connect
Topic-level配置以及其預設值將在下面討論。
Property | Default | Description |
broker.id | 每個broker都可以用一個唯一的非負整數id進行標識;這個id可以作為broker的“名字”,並且它的存在使得broker無須混淆consumers就可以遷移到不同的host/port上。你可以選擇任意你喜歡的數字作為id,只要id是唯一的即可。 | |
log.dirs | /tmp/kafka-logs | kafka存放資料的路徑。這個路徑並不是唯一的,可以是多個,路徑之間只需要使用逗號分隔即可;每當建立新partition時,都會選擇在包含最少partitions的路徑下進行。 |
port | 6667 | server接受客戶端連線的埠 |
zookeeper.connect | null | ZooKeeper連線字串的格式為:hostname:port,此處hostname和port分別是ZooKeeper叢集中某個節點的host和port;為了當某個host宕掉之後你能通過其他ZooKeeper節點進行連線,你可以按照一下方式制定多個hosts: hostname1:port1, hostname2:port2, hostname3:port3. ZooKeeper 允許你增加一個“chroot”路徑,將叢集中所有kafka資料存放在特定的路徑下。當多個Kafka叢集或者其他應用使用相同ZooKeeper叢集時,可以使用這個方式設定資料存放路徑。這種方式的實現可以通過這樣設定連線字串格式,如下所示: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 這樣設定就將所有kafka叢集資料存放在/chroot/path路徑下。注意,在你啟動broker之前,你必須建立這個路徑,並且consumers必須使用相同的連線格式。 |
message.max.bytes | 1000000 | server可以接收的訊息最大尺寸。重要的是,consumer和producer有關這個屬性的設定必須同步,否則producer釋出的訊息對consumer來說太大。 |
num.network.threads | 3 | server用來處理網路請求的網路執行緒數目;一般你不需要更改這個屬性。 |
num.io.threads | 8 | server用來處理請求的I/O執行緒的數目;這個執行緒數目至少要等於硬碟的個數。 |
background.threads | 4 | 用於後臺處理的執行緒數目,例如檔案刪除;你不需要更改這個屬性。 |
queued.max.requests | 500 | 在網路執行緒停止讀取新請求之前,可以排隊等待I/O執行緒處理的最大請求個數。 |
host.name | null | broker的hostname;如果hostname已經設定的話,broker將只會繫結到這個地址上;如果沒有設定,它將繫結到所有介面,併發布一份到ZK |
advertised.host.name | null | 如果設定,則就作為broker 的hostname發往producer、consumers以及其他brokers |
advertised.port | null | 此埠將給與producers、consumers、以及其他brokers,它會在建立連線時用到; 它僅在實際埠和server需要繫結的埠不一樣時才需要設定。 |
socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF 快取大小,server進行socket 連線所用 |
socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF快取大小,server進行socket連線時所用 |
socket.request.max.bytes | 100 * 1024 * 1024 | server允許的最大請求尺寸; 這將避免server溢位,它應該小於Java heap size |
num.partitions | 1 | 如果建立topic時沒有給出劃分partitions個數,這個數字將是topic下partitions數目的預設數值。 |
log.segment.bytes | 1014*1024*1024 | topic partition的日誌存放在某個目錄下諸多檔案中,這些檔案將partition的日誌切分成一段一段的;這個屬性就是每個檔案的最大尺寸;當尺寸達到這個數值時,就會建立新檔案。此設定可以由每個topic基礎設定時進行覆蓋。 檢視 the per-topic configuration section |
log.roll.hours | 24 * 7 | 即使檔案沒有到達log.segment.bytes,只要檔案建立時間到達此屬性,就會建立新檔案。這個設定也可以有topic層面的設定進行覆蓋; 檢視the per-topic configuration section |
log.cleanup.policy | delete | |
log.retention.minutes和log.retention.hours | 7 days | 每個日誌檔案刪除之前儲存的時間。預設資料儲存時間對所有topic都一樣。 log.retention.minutes 和 log.retention.bytes 都是用來設定刪除日誌檔案的,無論哪個屬性已經溢位。 這個屬性設定可以在topic基本設定時進行覆蓋。 檢視the per-topic configuration section |
log.retention.bytes | -1 | 每個topic下每個partition儲存資料的總量;注意,這是每個partitions的上限,因此這個數值乘以partitions的個數就是每個topic儲存的資料總量。同時注意:如果log.retention.hours和log.retention.bytes都設定了,則超過了任何一個限制都會造成刪除一個段檔案。 注意,這項設定可以由每個topic設定時進行覆蓋。 檢視the per-topic configuration section |
log.retention.check.interval.ms | 5 minutes | 檢查日誌分段檔案的間隔時間,以確定是否檔案屬性是否到達刪除要求。 |
log.cleaner.enable | false | 當這個屬性設定為false時,一旦日誌的儲存時間或者大小達到上限時,就會被刪除;如果設定為true,則當儲存屬性達到上限時,就會進行log compaction。 |
log.cleaner.threads | 1 | 進行日誌壓縮的執行緒數 |
log.cleaner.io.max.bytes.per.second | None | 進行log compaction時,log cleaner可以擁有的最大I/O數目。這項設定限制了cleaner,以避免干擾活動的請求服務。 |
log.cleaner.io.buffer.size | 500*1024*1024 | log cleaner清除過程中針對日誌進行索引化以及精簡化所用到的快取大小。最好設定大點,以提供充足的記憶體。 |
log.cleaner.io.buffer.load.factor | 512*1024 | 進行log cleaning時所需要的I/O chunk尺寸。你不需要更改這項設定。 |
log.cleaner.io.buffer.load.factor | 0.9 | log cleaning中所使用的hash表的負載因子;你不需要更改這個選項。 |
log.cleaner.backoff.ms | 15000 | 進行日誌是否清理檢查的時間間隔 |
log.cleaner.min.cleanable.ratio | 0.5 | 這項配置控制log compactor試圖清理日誌的頻率(假定log compaction是開啟的)。預設避免清理壓縮超過50%的日誌。這個比率綁定了備份日誌所消耗的最大空間(50%的日誌備份時壓縮率為50%)。更高的比率則意味著浪費消耗更少,也就可以更有效的清理更多的空間。這項設定在每個topic設定中可以覆蓋。 檢視the per-topic configuration section。 |
log.cleaner.delete.retention.ms | 1day | 儲存時間;儲存壓縮日誌的最長時間;也是客戶端消費訊息的最長時間,榮log.retention.minutes的區別在於一個控制未壓縮資料,一個控制壓縮後的資料;會被topic建立時的指定時間覆蓋。 |
log.index.size.max.bytes | 10*1024*1024 | 每個log segment的最大尺寸。注意,如果log尺寸達到這個數值,即使尺寸沒有超過log.segment.bytes限制,也需要產生新的log segment。 |
log.index.interval.bytes | 4096 | 當執行一次fetch後,需要一定的空間掃描最近的offset,設定的越大越好,一般使用預設值就可以 |
log.flush.interval.messages | Long.MaxValue | log檔案“sync”到磁碟之前累積的訊息條數。因為磁碟IO操作是一個慢操作,但又是一個“資料可靠性”的必要手段,所以檢查是否需要固化到硬碟的時間間隔。需要在“資料可靠性”與“效能”之間做必要的權衡,如果此值過大,將會導致每次“發sync”的時間過長(IO阻塞),如果此值過小,將會導致“fsync”的時間較長(IO阻塞),如果此值過小,將會導致”發sync“的次數較多,這也就意味著整體的client請求有一定的延遲,物理server故障,將會導致沒有fsync的訊息丟失。 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查是否需要fsync的時間間隔 |
log.flush.interval.ms | Long.MaxValue | 僅僅通過interval來控制訊息的磁碟寫入時機,是不足的,這個數用來控制”fsync“的時間間隔,如果訊息量始終沒有達到固化到磁碟的訊息數,但是離上次磁碟同步的時間間隔達到閾值,也將觸發磁碟同步。 |
log.delete.delay.ms | 60000 | 檔案在索引中清除後的保留時間,一般不需要修改 |
auto.create.topics.enable | true | 是否允許自動建立topic。如果是真的,則produce或者fetch 不存在的topic時,會自動建立這個topic。否則需要使用命令列建立topic |
controller.socket.timeout.ms | 30000 | partition管理控制器進行備份時,socket的超時時間。 |
controller.message.queue.size | Int.MaxValue | controller-to-broker-channles的buffer 尺寸 |
default.replication.factor | 1 | 預設備份份數,僅指自動建立的topics |
replica.lag.time.max.ms | 10000 | 如果一個follower在這個時間內沒有傳送fetch請求,leader將從ISR重移除這個follower,並認為這個follower已經掛了 |
replica.lag.max.messages | 4000 | 如果一個replica沒有備份的條數超過這個數值,則leader將移除這個follower,並認為這個follower已經掛了 |
replica.socket.timeout.ms | 30*1000 | leader 備份資料時的socket網路請求的超時時間 |
replica.socket.receive.buffer.bytes | 64*1024 | 備份時向leader傳送網路請求時的socket receive buffer |
replica.fetch.max.bytes | 1024*1024 | 備份時每次fetch的最大值 |
replica.fetch.min.bytes | 500 | leader發出備份請求時,資料到達leader的最長等待時間 |
replica.fetch.min.bytes | 1 | 備份時每次fetch之後迴應的最小尺寸 |
num.replica.fetchers | 1 | 從leader備份資料的執行緒數 |
replica.high.watermark.checkpoint.interval.ms | 5000 | 每個replica檢查是否將最高水位進行固化的頻率 |
fetch.purgatory.purge.interval.requests | 1000 | fetch 請求清除時的清除間隔 |
producer.purgatory.purge.interval.requests | 1000 | producer請求清除時的清除間隔 |
zookeeper.session.timeout.ms | 6000 | zookeeper會話超時時間。 |
zookeeper.connection.timeout.ms | 6000 | 客戶端等待和zookeeper建立連線的最大時間 |
zookeeper.sync.time.ms | 2000 | zk follower落後於zk leader的最長時間 |
controlled.shutdown.enable | true | 是否能夠控制broker的關閉。如果能夠,broker將可以移動所有leaders到其他的broker上,在關閉之前。這減少了不可用性在關機過程中。 |
controlled.shutdown.max.retries | 3 | 在執行不徹底的關機之前,可以成功執行關機的命令數。 |
controlled.shutdown.retry.backoff.ms | 5000 | 在關機之間的backoff時間 |
auto.leader.rebalance.enable | true | 如果這是true,控制者將會自動平衡brokers對於partitions的leadership |
leader.imbalance.per.broker.percentage | 10 | 每個broker所允許的leader最大不平衡比率 |
leader.imbalance.check.interval.seconds | 300 | 檢查leader不平衡的頻率 |
offset.metadata.max.bytes | 4096 | 允許客戶端儲存他們offsets的最大個數 |
max.connections.per.ip | Int.MaxValue | 每個ip地址上每個broker可以被連線的最大數目 |
max.connections.per.ip.overrides | 每個ip或者hostname預設的連線的最大覆蓋 | |
connections.max.idle.ms | 600000 | 空連線的超時限制 |
log.roll.jitter.{ms,hours} | 0 | 從logRollTimeMillis抽離的jitter最大數目 |
num.recovery.threads.per.data.dir | 1 | 每個資料目錄用來日誌恢復的執行緒數目 |
unclean.leader.election.enable | true | 指明瞭是否能夠使不在ISR中replicas設定用來作為leader |
delete.topic.enable | false | 能夠刪除topic |
offsets.topic.num.partitions | 50 | The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes | 1440 | 存在時間超過這個時間限制的offsets都將被標記為待刪除 |
offsets.retention.check.interval.ms | 600000 | offset管理器檢查陳舊offsets的頻率 |
offsets.topic.replication.factor | 3 | topic的offset的備份份數。建議設定更高的數字保證更高的可用性 |
offset.topic.segment.bytes | 104857600 | offsets topic的segment尺寸。 |
offsets.load.buffer.size | 5242880 | 這項設定與批量尺寸相關,當從offsets segment中讀取時使用。 |
offsets.commit.required.acks | -1 | 在offset commit可以接受之前,需要設定確認的數目,一般不需要更改 |
offsets.commit.timeout.ms 5000 offset commit的延遲時間,這和producer request的超時時間相似。
更多細節可以在scala 類 kafka.server.KafkaConfig中找到。
topic-level的配置
有關topics的配置既有全域性的又有每個topic獨有的配置。如果沒有給定特定topic設定,則應用預設的全域性設定。這些覆蓋會在每次建立topic發生。下面的例子:建立一個topic,命名為my-topic,自定義最大訊息尺寸以及重新整理比率為:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
需要刪除重寫時,可以按照以下來做:
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
--deleteConfig max.message.bytes
以下是topic-level的配置選項。server的預設配置在Server Default Property列下給出了,設定這些預設值不會改變原有的設定
Property | Default | Server Default Property | Description |
cleanup.policy | delete | log.cleanup.policy | 要麼是”delete“要麼是”compact“; 這個字串指明瞭針對舊日誌部分的利用方式;預設方式("delete")將會丟棄舊的部分當他們的回收時間或者尺寸限制到達時。”compact“將會進行日誌壓縮 |
delete.retention.ms | 86400000 (24 hours) | log.cleaner.delete.retention.ms | 對於壓縮日誌保留的最長時間,也是客戶端消費訊息的最長時間,通log.retention.minutes的區別在於一個控制未壓縮資料,一個控制壓縮後的資料。此項配置可以在topic建立時的置頂引數覆蓋 |
flush.messages | None | log.flush.interval.messages | 此項配置指定時間間隔:強制進行fsync日誌。例如,如果這個選項設定為1,那麼每條訊息之後都需要進行fsync,如果設定為5,則每5條訊息就需要進行一次fsync。一般來說,建議你不要設定這個值。此引數的設定,需要在"資料可靠性"與"效能"之間做必要的權衡.如果此值過大,將會導致每次"fsync"的時間較長(IO阻塞),如果此值過小,將會導致"fsync"的次數較多,這也意味著整體的client請求有一定的延遲.物理server故障,將會導致沒有fsync的訊息丟失. |
flush.ms | None | log.flush.interval.ms | 此項配置用來置頂強制進行fsync日誌到磁碟的時間間隔;例如,如果設定為1000,那麼每1000ms就需要進行一次fsync。一般不建議使用這個選項 |
index.interval.bytes | 4096 | log.index.interval.bytes | 預設設定保證了我們每4096個位元組就對訊息新增一個索引,更多的索引使得閱讀的訊息更加靠近,但是索引規模卻會由此增大;一般不需要改變這個選項 |
max.message.bytes | 1000000 | max.message.bytes | kafka追加訊息的最大尺寸。注意如果你增大這個尺寸,你也必須增大你consumer的fetch 尺寸,這樣consumer才能fetch到這些最大尺寸的訊息。 |
min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | 此項配置控制log壓縮器試圖進行清除日誌的頻率。預設情況下,將避免清除壓縮率超過50%的日誌。這個比率避免了最大的空間浪費 |
min.insync.replicas | 1 | min.insync.replicas | 當producer設定request.required.acks為-1時,min.insync.replicas指定replicas的最小數目(必須確認每一個repica的寫資料都是成功的),如果這個數目沒有達到,producer會產生異常。 |
retention.bytes | None | log.retention.bytes | 如果使用“delete”的retention 策略,這項配置就是指在刪除日誌之前,日誌所能達到的最大尺寸。預設情況下,沒有尺寸限制而只有時間限制 |
retention.ms | 7 days | log.retention.minutes | 如果使用“delete”的retention策略,這項配置就是指刪除日誌前日誌儲存的時間。 |
segment.bytes | 1GB | log.segment.bytes | kafka中log日誌是分成一塊塊儲存的,此配置是指log日誌劃分成塊的大小 |
segment.index.bytes | 10MB | log.index.size.max.bytes | 此配置是有關offsets和檔案位置之間對映的索引檔案的大小;一般不需要修改這個配置 |
segment.ms | 7 days | log.roll.hours | 即使log的分塊檔案沒有達到需要刪除、壓縮的大小,一旦log 的時間達到這個上限,就會強制新建一個log分塊檔案 |
segment.jitter.ms | 0 | log.roll.jitter.{ms,hours} | The maximum jitter to subtract from logRollTimeMillis. |
3.2 Consumer Configs
consumer基本配置如下:
group.id
zookeeper.connect
Property | Default | Description |
group.id | 用來唯一標識consumer程序所在組的字串,如果設定同樣的group id,表示這些processes都是屬於同一個consumer group | |
zookeeper.connect | 指定zookeeper的連線的字串,格式是hostname:port,此處host和port都是zookeeper server的host和port,為避免某個zookeeper 機器宕機之後失聯,你可以指定多個hostname:port,使用逗號作為分隔: hostname1:port1,hostname2:port2,hostname3:port3 可以在zookeeper連線字串中加入zookeeper的chroot路徑,此路徑用於存放他自己的資料,方式: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path |
|
consumer.id | null | 不需要設定,一般自動產生 |
socket.timeout.ms | 30*100 | 網路請求的超時限制。真實的超時限制是 max.fetch.wait+socket.timeout.ms |
socket.receive.buffer.bytes | 64*1024 | socket用於接收網路請求的快取大小 |
fetch.message.max.bytes | 1024*1024 | 每次fetch請求中,針對每次fetch訊息的最大位元組數。這些位元組將會督導用於每個partition的記憶體中,因此,此設定將會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server允許的最大訊息尺寸相等,否則,producer可能傳送的訊息尺寸大於consumer所能消耗的尺寸。 |
num.consumer.fetchers | 1 | 用於fetch資料的fetcher執行緒數 |
auto.commit.enable | true | 如果為真,consumer所fetch的訊息的offset將會自動的同步到zookeeper。這項提交的offset將在程序掛掉時,由新的consumer使用 |
auto.commit.interval.ms | 60*1000 | consumer向zookeeper提交offset的頻率,單位是秒 |
queued.max.message.chunks | 2 | 用於快取訊息的最大數目,以供consumption。每個chunk必須和fetch.message.max.bytes相同 |
rebalance.max.retries | 4 | 當新的consumer加入到consumer group時,consumers集合試圖重新平衡分配到每個consumer的partitions數目。如果consumers集合改變了,當分配正在執行時,這個重新平衡會失敗並重入 |
fetch.min.bytes | 1 | 每次fetch請求時,server應該返回的最小位元組數。如果沒有足夠的資料返回,請求會等待,直到足夠的資料才會返回。 |
fetch.wait.max.ms | 100 | 如果沒有足夠的資料能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間。 |
rebalance.backoff.ms | 2000 | 在重試reblance之前backoff時間 |
refresh.leader.backoff.ms | 200 | 在試圖確定某個partition的leader是否失去他的leader地位之前,需要等待的backoff時間 |
auto.offset.reset | largest | zookeeper中沒有初始化的offset時,如果offset是以下值的迴應: smallest:自動復位offset為smallest的offset largest:自動復位offset為largest的offset anything else:向consumer丟擲異常 |
consumer.timeout.ms | -1 | 如果沒有訊息可用,即使等待特定的時間之後也沒有,則丟擲超時異常 |
exclude.internal.topics | true | 是否將內部topics的訊息暴露給consumer |
paritition.assignment.strategy | range | 選擇向consumer 流分配partitions的策略,可選值:range,roundrobin |
client.id | group id value | 是使用者特定的字串,用來在每次請求中幫助跟蹤呼叫。它應該可以邏輯上確認產生這個請求的應用 |
zookeeper.session.timeout.ms | 6000 | zookeeper 會話的超時限制。如果consumer在這段時間內沒有向zookeeper傳送心跳資訊,則它會被認為掛掉了,並且reblance將會產生 |
zookeeper.connection.timeout.ms | 6000 | 客戶端在建立通zookeeper連線中的最大等待時間 |
zookeeper.sync.time.ms | 2000 | ZK follower可以落後ZK leader的最大時間 |
offsets.storage | zookeeper | 用於存放offsets的地點: zookeeper或者kafka |
offset.channel.backoff.ms | 1000 | 重新連線offsets channel或者是重試失敗的offset的fetch/commit請求的backoff時間 |
offsets.channel.socket.timeout.ms | 10000 | 當讀取offset的fetch/commit請求迴應的socket 超時限制。此超時限制是被consumerMetadata請求用來請求offset管理 |
offsets.commit.max.retries | 5 | 重試offset commit的次數。這個重試只應用於offset commits在shut-down之間。他 |
dual.commit.enabled | true | 如果使用“kafka”作為offsets.storage,你可以二次提交offset到zookeeper(還有一次是提交到kafka)。在zookeeper-based的offset storage到kafka-based的offset storage遷移時,這是必須的。對任意給定的consumer group來說,比較安全的建議是當完成遷移之後就關閉這個選項 |
partition.assignment.strategy | range | 在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 資料流的策略; 迴圈的partition分配器分配所有可用的partitions以及所有可用consumer 執行緒。它會將partition迴圈的分配到consumer執行緒上。如果所有consumer例項的訂閱都是確定的,則partitions的劃分是確定的分佈。迴圈分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數量的資料流。(2)訂閱的topic的集合對於consumer group中每個consumer例項來說都是確定的。 |
更多細節可以檢視 scala類: kafka.consumer.ConsumerConfig
3.3 Producer Configs
producer基本的配置屬性包含:
(1) metadata.broker.list
(2)request.required.acks
(3)producer.type
(4)serializer.class
Property | Default | Description |
metadata.broker.list | 服務於bootstrapping。producer僅用來獲取metadata(topics,partitions,replicas)。傳送實際資料的socket連線將基於返回的metadata資料資訊而建立。格式是: host1:port1,host2:port2 這個列表可以是brokers的子列表或者是一個指向brokers的VIP |
|
request.required.acks | 0 | 此配置是表明當一次produce請求被認為完成時的確認值。特別是,多少個其他brokers必須已經提交了資料到他們的log並且向他們的leader確認了這些資訊。典型的值包括: 0: 表示producer從來不等待來自broker的確認資訊(和0.7一樣的行為)。這個選擇提供了最小的時延但同時風險最大(因為當server宕機時,資料將會丟失)。 1:表示獲得leader replica已經接收了資料的確認資訊。這個選擇時延較小同時確保了server確認接收成功。 -1:producer會獲得所有同步replicas都收到資料的確認。同時時延最大,然而,這種方式並沒有完全消除丟失訊息的風險,因為同步replicas的數量可能是1.如果你想確保某些replicas接收到資料,那麼你應該在topic-level設定中選項min.insync.replicas設定一下。請閱讀一下設計文件,可以獲得更深入的討論。 |
request.timeout.ms | 10000 | broker盡力實現request.required.acks需求時的等待時間,否則會發送錯誤到客戶端 |
producer.type | sync | 此選項置頂了訊息是否在後臺執行緒中非同步傳送。正確的值: (1) async: 非同步傳送 (2) sync: 同步傳送 通過將producer設定為非同步,我們可以批量處理請求(有利於提高吞吐率)但是這也就造成了客戶端機器丟掉未傳送資料的可能性 |
serializer.class | kafka.serializer.DefaultEncoder | 訊息的序列化類別。預設編碼器輸入一個位元組byte[],然後返回相同的位元組byte[] |
key.serializer.class | 關鍵字的序列化類。如果沒給與這項,預設情況是和訊息一致 | |
partitioner.class | kafka.producer.DefaultPartitioner | partitioner 類,用於在subtopics之間劃分訊息。預設partitioner基於key的hash表 |
compression.codec | none | 此項引數可以設定壓縮資料的codec,可選codec為:“none”, “gzip”, “snappy” |
compressed.topics | null | 此項引數可以設定某些特定的topics是否進行壓縮。如果壓縮codec是NoCompressCodec之外的codec,則對指定的topics資料應用這些codec。如果壓縮topics列表是空,則將特定的壓縮codec應用於所有topics。如果壓縮的codec是NoCompressionCodec,壓縮對所有topics軍不可用。 |
message.send.max.retries | 3 | 此項引數將使producer自動重試失敗的傳送請求。此項引數將置頂重試的次數。注意:設定非0值將導致重複某些網路錯誤:引起一條傳送並引起確認丟失 |
retry.backoff.ms | 100 | 在每次重試之前,producer會更新相關topic的metadata,以此進行檢視新的leader是否分配好了。因為leader的選擇需要一點時間,此選項指定更新metadata之前producer需要等待的時間。 |
topic.metadata.refresh.interval.ms | 600*1000 | producer一般會在某些失敗的情況下(partition missing,leader不可用等)更新topic的metadata。他將會規律的迴圈。如果你設定為負值,metadata只有在失敗的情況下才更新。如果設定為0,metadata會在每次訊息傳送後就會更新(不建議這種選擇,系統消耗太大)。重要提示: 更新是有在訊息傳送後才會發生,因此,如果producer從來不傳送訊息,則metadata從來也不會更新。 |
queue.buffering.max.ms | 5000 | 當應用async模式時,使用者快取資料的最大時間間隔。例如,設定為100時,將會批量處理100ms之內訊息。這將改善吞吐率,但是會增加由於快取產生的延遲。 |
queue.buffering.max.messages | 10000 | 當使用async模式時,在在producer必須被阻塞或者資料必須丟失之前,可以快取到佇列中的未傳送的最大訊息條數 |
batch.num.messages | 200 | 使用async模式時,可以批量處理訊息的最大條數。或者訊息數目已到達這個上線或者是queue.buffer.max.ms到達,producer才會處理 |
send.buffer.bytes | 100*1024 | socket 寫快取尺寸 |
client.id | “” | 這個client id是使用者特定的字串,在每次請求中包含用來追蹤呼叫,他應該邏輯上可以確認是那個應用發出了這個請求。 |
更多細節需要檢視 scala類
kafka.producer.ProducerConfig
3、4 New Producer Configs
我們正在努力替換現有的producer。程式碼在trunk中是可用的,可以認為beta版本。下面是新producer的配置
Name | Type | Default | Importance | Description |
boostrap.servers | list | high | 用於建立與kafka叢集連線的host/port組。資料將會在所有servers上均衡載入,不管哪些server是指定用於bootstrapping。這個列表僅僅影響初始化的hosts(用於發現全部的servers)。這個列表格式: host1:port1,host2:port2,... 因為這些server僅僅是用於初始化的連線,以發現叢集所有成員關係(可能會動態的變化),這個列表不需要包含所有的servers(你可能想要不止一個server,儘管這樣,可能某個server宕機了)。如果沒有server在這個列表出現,則傳送資料會一直失敗,直到列表可用。 |
|
acks | string | 1 | high | producer需要server接收到資料之後發出的確認接收的訊號,此項配置就是指procuder需要多少個這樣的確認訊號。此配置實際上代表了資料備份的可用性。以下設定為常用選項: (1)acks=0: 設定為0表示producer不需要等待任何確認收到的資訊。副本將立即加到socket buffer並認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收資料,同時重試配置不會發生作用(因為客戶端不知道是否失敗)回饋的offset會總是設定為-1; (2)acks=1: 這意味著至少要等待leader已經成功將資料寫入本地log,但是並沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份資料,而此時leader又掛掉,則訊息會丟失。 (3)acks=all: 這意味著leader需要等待所有備份都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失資料。這是最強的保證。 (4)其他的設定,例如acks=2也是可以的,這將需要給定的acks數量,但是這種策略一般很少用。 |
buffer.memory | long | 33554432 | high | producer可以用來快取資料的記憶體大小。如果資料產生速度大於向broker傳送的速度,producer會阻塞或者丟擲異常,以“block.on.buffer.full”來表明。 這項設定將和producer能夠使用的總記憶體相關,但並不是一個硬性的限制,因為不是producer使用的所有記憶體都是用於快取。一些額外的記憶體會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。 |
compression.type | string | none | high | producer用於壓縮資料的壓縮型別。預設是無壓縮。正確的選項值是none、gzip、snappy。 壓縮最好用於批量處理,批量處理訊息越多,壓縮效能越好。 |
retries | int | 0 | high | 設定大於0的值將使客戶端重新發送任何資料,一旦這些資料傳送失敗。注意,這些重試與客戶端接收到傳送錯誤時的重試沒有什麼不同。允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是傳送到同一個partition,則第一個訊息失敗第二個傳送成功,則第二條訊息會比第一條訊息出現要早。 |
batch.size | int | 16384 | medium | producer將試圖批處理訊息記錄,以減少請求次數。這將改善client與server之間的效能。這項配置控制預設的批量處理訊息位元組數。 不會試圖處理大於這個位元組數的訊息位元組數。 傳送到brokers的請求將包含多個批量處理,其中會包含對每個partition的一個請求。 較小的批量處理數值比較少用,並且可能降低吞吐量(0則會僅用批量處理)。較大的批量處理數值將會浪費更多記憶體空間,這樣就需要分配特定批量處理數值的記憶體大小。 |
client.id | string | medium | 當向server發出請求時,這個字串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以傳送資訊。這項應用可以設定任意字串,因為沒有任何功能性的目的,除了記錄和跟蹤 | |
linger.ms | long | 0 | medium | producer組將會彙總任何在請求與傳送之間到達的訊息記錄一個單獨批量的請求。通常來說,這隻有在記錄產生速度大於傳送速度的時候才能發生。然而,在某些條件下,客戶端將希望降低請求的數量,甚至降低到中等負載一下。這項設定將通過增加小的延遲來完成--即,不是立即傳送一條記錄,producer將會等待給定的延遲時間以允許其他訊息記錄傳送,這些訊息記錄可以批量處理。這可以認為是TCP種Nagle的演算法類似。這項設定設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即傳送而不顧這項設定,然而如果我們獲得訊息位元組數比這項設定要小的多,我們需要“linger”特定的時間以獲取更多的訊息。 這個設定預設為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。 |
max.request.size | int | 1028576 | medium | 請求的最大位元組數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對訊息記錄尺寸的覆蓋,這些尺寸和這個設定不同。此項設定將會限制producer每次批量傳送請求的數目,以防發出巨量的請求。 |
receive.buffer.bytes | int | 32768 | medium | TCP receive快取大小,當閱讀資料時使用 |
send.buffer.bytes | int | 131072 | medium | TCP send快取大小,當傳送資料時使用 |
timeout.ms | int | 30000 | medium | 此配置選項控制server等待來自followers的確認的最大時間。如果確認的請求數目在此時間內沒有實現,則會返回一個錯誤。這個超時限制是以server端度量的,沒有包含請求的網路延遲 |
block.on.buffer.full | boolean | true | low | 當我們記憶體快取用盡時,必須停止接收新訊息記錄或者丟擲錯誤。預設情況下,這個設定為真,然而某些阻塞可能不值得期待,因此立即丟擲錯誤更好。設定為false則會這樣:producer會丟擲一個異常錯誤:BufferExhaustedException, 如果記錄已經發送同時快取已滿 |
metadata.fetch.timeout.ms | long | 60000 | low | 是指我們所獲取的一些元素據的第一個時間資料。元素據包含:topic,host,partitions。此項配置是指當等待元素據fetch成功完成所需要的時間,否則會跑出異常給客戶端。 |
metadata.max.age.ms | long | 300000 | low | 以微秒為單位的時間,是在我們強制更新metadata的時間間隔。即使我們沒有看到任何partition leadership改變。 |
metric.reporters | list | [] | low | 類的列表,用於衡量指標。實現MetricReporter介面,將允許增加一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於註冊JMX統計 |
metrics.num.samples | int | 2 | low | 用於維護metrics的樣本數 |
metrics.sample.window.ms | long | 30000 | low | metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了視窗大小,例如。我們可能在30s的期間維護兩個樣本。當一個視窗推出後,我們會擦除並重寫最老的視窗 |
recoonect.backoff.ms | long | 10 | low | 連線失敗時,當我們重新連線時的等待時間。這避免了客戶端反覆重連 |
retry.backoff.ms | long | 100 | low | 在試圖重試失敗的produce請求之前的等待時間。避免陷入傳送-失敗的死迴圈中。 |