kafka 1.0 中文文件(三)--Broker的配置
阿新 • • 發佈:2019-01-08
3.1 Broker Configs
基本配置如下:
1. broker.id
2. log.dirs
3. zookeeper.connect
下面將更詳細地討論主題級別的配置和預設設定。
名稱 | 描述 | 型別 | 預設 | 重要性 |
---|---|---|---|---|
zookeeper.connect Zookeeper | 主機地址 | string | high | |
advertised.host.name | DEPRECATED:僅在未設定“advertised.listeners”或“listeners”時使用。用advertised.listeners |
string | null | high |
advertised.listeners | 監聽器釋出到ZooKeeper供客戶使用,如果與上面的監聽器不同。 在IaaS環境中,這可能需要與代理繫結的介面不同。 如果沒有設定,將使用listeners |
string | null | high |
advertised.port | DEPRECATED:僅在未設定“advertised.listeners”或“listeners”時使用。 改用advertised.listeners 替代。 釋出到ZooKeeper供客戶端使用的埠。 在IaaS環境中,這可能需要與代理繫結的埠不同。 如果沒有設定,它將使用broker繫結的相同埠。 |
int | null | high |
auto.create.topics.enable | 是否允許自動建立topic。如果設為true,那麼produce,consume或者fetch metadata一個不存在的topic時,就會自動建立一個預設replication factor和partition number的topic。 | boolean | true | high |
auto.leader.rebalance.enable | 如果設為true,複製控制器會週期性的自動嘗試,為所有的broker的每個partition平衡leadership,為更優先(preferred)的replica分配leadership。 | boolean | true | high |
background.threads | 一些後臺任務處理的執行緒數,例如過期訊息檔案的刪除等,一般情況下不需要去做修改 | int | 10 | high |
broker.id | 每一個broker在叢集中的唯一表示,要求是正數。當該伺服器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的訊息情況。如果未設定,則會生成唯一的代理標識。為避免zookeeper生成的代理標識與使用者配置的代理標識之間的衝突,生成的代理標識從reserved.broker.max.id + 1開始。 | int | -1 | high |
compression.type | 為主題指定一個壓縮型別,此配置接受標準壓縮編碼(’gzip’, ‘snappy’, lz4),另外接受’uncompressed‘相當於不壓縮, ‘producer’ 意味著壓縮型別由producer指定。 | string | producer | high |
delete.topic.enable | 啟用刪除主題。 如果此配置已關閉,則通過管理工具刪除主題將不起作用。刪除topic是影響註冊在/admin/delete_topics的監聽 | boolean | false | high |
host.name | DEPRECATED:僅在未設定“listeners”時使用。 使用listeners 來代替。 broker的主機名。 如果這個設定,它只會繫結到這個地址。 如果沒有設定,它將繫結到所有interface。並將其中之一發送到ZK,但是傳送到zk的不一定是正確的地址,導致消費端消費不到訊息,所以這裡必須要設定 |
String | “” | High |
leader.imbalance.check.interval.seconds | 分割槽rebalance檢查的頻率,由控制器觸發 | long | 300 | high |
leader.imbalance.per.broker.percentage | 每個broker允許的不平衡的leader的百分比。如果每個broker超過了這個百分比,複製控制器會對分割槽進行重新的平衡。該值以百分比形式指定。 | int | 10 | high |
listeners | 監聽器列表 - 逗號分隔的我們將監聽的URI列表和監聽器名稱。 如果偵聽器名稱不是安全協議,則還必須設定listener.security.protocol.map。 指定主機名為0.0.0.0以繫結到所有介面。 保留主機名為空以繫結到預設介面。 合法偵聽器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093 | string | null | high |
log.dir | 儲存日誌資料的目錄(對log.dirs屬性的補充) | string | /tmp/kafka-logs | high |
log.dirs | 一個用逗號分隔的目錄列表,可以有多個,用來為Kafka儲存資料。每當需要為一個新的分割槽分配一個目錄時,會選擇當前的儲存分割槽最少的目錄來儲存。如果沒有配置,則使用log.dir配置的值。 | string | null | high |
log.flush.interval.messages | 在將訊息重新整理到磁碟之前,在日誌分割槽上累積的訊息數量。強制fsync一個分割槽的log檔案之前暫存的訊息數量。因為磁碟IO操作是一個慢操作,但又是一個“資料可靠性”的必要手段,所以檢查是否需要固化到硬碟的時間間隔。需要在“資料可靠性”與“效能”之間做必要的權衡,如果此值過大,將會導致每次“fsync”的時間過長(IO阻塞),如果此值過小,將會導致”fsync“的次數較多,這也就意味著整體的client請求有一定的延遲,物理server故障,將會導致沒有fsync的訊息丟失。通常建議使用replication來確保永續性,而不是依靠單機上的fsync | long | 9223372036854775807 | high |
log.flush.interval.ms | 任何主題中的訊息在重新整理到磁碟之前都保留在記憶體中的最長時間(以毫秒為單位)。 如果未設定,則使用log.flush.scheduler.interval.ms中的值 | long | null | high |
log.flush.scheduler.interval.ms | 日誌重新整理器檢查是否需要將任何日誌重新整理到磁碟的頻率(以毫秒為單位)檢查是否需要fsync的時間間隔 | long | 9223372036854775807 | high |
log.flush.offset.checkpoint.interval.ms | 記錄上次把日誌刷到磁碟的時間點的頻率,用來日後的恢復。通常不需要改變。 | int | 60000 | high |
log.flush.start.offset.checkpoint.interval.ms | 我們更新記錄起始偏移量的持續記錄的頻率 | int | 60000 | high |
log.retention.bytes | 日誌達到刪除大小的閾值。每個topic下每個分割槽儲存資料的最大檔案大小;注意,這是每個分割槽的上限,因此這個數值乘以分割槽的個數就是每個topic儲存的資料總量。同時注意:如果log.retention.hours和log.retention.bytes都設定了,則超過了任何一個限制都會造成刪除一個段檔案。注意,這項設定可以由每個topic設定時進行覆蓋。-1為不限制。 | long | -1 | high |
log.retention.hours | 每個日誌檔案刪除之前儲存的時間,單位小時。預設資料儲存時間對所有topic都一樣。log.retention.minutes 和 log.retention.bytes 都是用來設定刪除日誌檔案的,如果達到任意一個條件的限制,都會進行刪除。這個屬性設定可以在topic基本設定時進行覆蓋。 | int | 168 | high |
log.retention.minutes | 在刪除日誌檔案之前保留日誌的分鐘數(以分鐘為單位),次要log.retention.ms屬性。 如果未設定,則使用log.retention.hours中的值 | int | null | high |
log.retention.ms | 保留日誌檔案的毫秒數(以毫秒為單位),如果未設定,則使用log.retention.minutes中的值 | long | null | high |
log.roll.hours | 這個設定會強制Kafka去新建一個新的log segment檔案,即使當前使用的segment檔案的大小還沒有超過log.segment.bytes。此配置可以被覆蓋 | int | 168 | high |
log.roll.jitter.hours | 從logRollTimeMillis減去的最大抖動(以小時為單位),次要log.roll.jitter.ms屬性 | int | 0 | high |
log.roll.jitter.ms | 同上,如果沒有設定則使用log.roll.jitter.hours | long | null | high |
log.roll.ms | 同log.roll.hours,單位ms | long | null | high |
log.segment.bytes | topic 分割槽的日誌存放在某個目錄下諸多檔案中,這些檔案將partition的日誌切分成一段一段的,這就是段檔案(segment file);一個topic的一個分割槽對應的所有segment檔案稱為log。這個設定控制著一個segment檔案的最大的大小,如果超過了此大小,就會生成一個新的segment檔案。此配置可以被覆蓋。 int | 1073741824 | high | |
log.segment.delete.delay.ms | 在log檔案被移出索引後(刪除),log檔案的保留時間。在這段時間內執行的任意正在進行的讀操作完成操作,不用去打斷它。通常不需要改變。 | long | 60000 | high |
message.max.bytes | kafka允許的最大的一個批次的訊息大小。 如果這個數字增加,並且有0.10.2版本以下的消費者,那麼消費者的提取大小也必須增加,以便他們可以取得這麼大的記錄批次。在最新的訊息格式版本中,記錄總是被組合到一個批次以提高效率。 在以前的訊息格式版本中,未壓縮的記錄不會分組到批次中,並且此限制僅適用於該情況下的單個記錄。可以使用主題級別max.message.bytes來設定每個主題。 | int | 1000012 | high |
min.insync.replicas | 當生產者將ack設定為“全部”(或“-1”)時,min.insync.replicas指定必須確認寫入被認為成功的最小副本數(必須確認每一個repica的寫資料都是成功的)。 如果這個最小值不能滿足,那麼生產者將會引發一個異常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。當一起使用時,min.insync.replicas和acks允許您強制更大的耐久性保證。 一個典型的情況是建立一個複製因子為3的主題,將min.insync.replicas設定為2,並且生產者使用“all”選項。 這將確保如果大多數副本沒有寫入生產者則丟擲異常。 | int | 1 | high |
num.io.threads | server端處理請求時的I/O執行緒的數量,不要小於磁碟的數量。 | int | 8 | high |
num.network.threads | 伺服器用於接收來自網路的請求並向網路傳送響應的執行緒數 | int | 3 | high |
num.recovery.threads.per.data.dir | 每個資料目錄的執行緒數,用於啟動時的日誌恢復和關閉時的重新整理 | int | 1 | high |
num.replica.fetchers | 用來從leader複製訊息的執行緒數量,增大這個值可以增加follow的I/O並行度。 | int | 1 | high |
offset.metadata.max.bytes | 允許client(消費者)儲存它們元資料(offset)的最大的資料量。 | int | 4096 | high |
offsets.commit.required.acks | 在offset commit可以接受之前,需要設定acks的數目,一般不需要更改 | short | -1 | high |
offsets.commit.timeout.ms | offsets提交將被延遲,直到偏移量topic的所有副本都收到提交或達到此超時。 這與生產者請求超時類似。 | int | 5000 | high |
offsets.load.buffer.size | 每次從offset段檔案往快取載入offsets資料時的讀取的資料大小。 | int | 5242880 | high |
offsets.retention.check.interval.ms | 檢查失效offset的頻率 | long | 600000 | high |
offsets.retention.minutes | 如果一個group在這個時間沒沒有提交offsets,則會清理這個group的offset資料 | int | 1440 | high |
offsets.topic.compression.codec | 用於offsets主題的壓縮編解碼器 - 壓縮可用於實現“原子”提交 | int | 0 | high |
offsets.topic.num.partitions | Offsets topic的分割槽數量(部署後不應更改) | int | 50 | high |
offsets.topic.replication.factor | Offsets topic的複製因子(設定得更高以確保可用性)。 內部主題建立將失敗,直到群集大小滿足此複製因素要求。 | short | 3 | high |
offsets.topic.segment.bytes | 為了便於更快的日誌壓縮和快取載入,偏移量的主題段位元組應保持相對較小 | int | 104857600 | high |
port | DEPRECATED:僅在未設定“listeners”時使用。 使用listeners 來代替。 這個埠來監聽和接受連線 |
int | 9092 | high |
queued.max.requests | I/O執行緒等待佇列中的最大的請求數,超過這個數量,network執行緒就不會再接收一個新的請求。這個引數是指定用於快取網路請求的佇列的最大容量,這個佇列達到上限之後將不再接收新請求。一般不會成為瓶頸點,除非I/O效能太差,這時需要配合num.io.threads等配置一同進行調整。 int | 500 | high | |
quota.consumer.default | DEPRECATED:只有在動態預設配額沒有配置或者為Zookeeper時才使用。 如果一個消費者每秒消費的資料量大於此值,則暫時不會再允許消費。0.9版本新加。 | long | 9223372036854775807 | high |
quota.producer.default | DEPRECATED:只有在動態預設配額沒有配置或者為Zookeeper時才使用。如果一個生產者每秒產生的資料大於此值,則暫時會推遲接受生產者資料。 | long | 9223372036854775807 | high |
replica.fetch.min.bytes | 複製資料過程中,replica收到的每個fetch響應,期望的最小的位元組數,如果沒有收到足夠的位元組數,就會等待期望更多的資料,直到達到replica.fetch.wait.max.ms。 | int | 1 | high |
replica.fetch.wait.max.ms | Replicas follow同leader之間通訊的最大等待時間,失敗了會重試。 此值始終應始終小於replica.lag.time.max.ms,以防止針對低吞吐量主題頻繁收縮ISR | int 500 | high | |
replica.lag.time.max.ms | 如果一個follower在這個時間內沒有傳送fetch請求,leader將從ISR重移除這個follower,並認為這個follower已經掛了 | long | 10000 | high |
replica.high.watermark.checkpoint.interval.ms | 每一個replica follower儲存自己的high watermark到磁碟的頻率,用來日後的recovery。 | long | 5000 | high |
replica.socket.receive.buffer.bytes | 複製資料過程中,follower傳送網路請求給leader的socket receiver buffer的大小。 | int | 65536 | high |
replica.socket.timeout.ms | 複製資料過程中,replica傳送給leader的網路請求的socket超時時間。它的值應該至少是replica.fetch.wait.max.ms | int | 30000 | high |
request.timeout.ms | 在向producer傳送ack之前,broker允許等待的最大時間,如果超時,broker將會向producer傳送一個error ACK.意味著上一次訊息因為某種原因未能成功(比如follower未能同步成功) ,客戶端將在必要時重新發送請求,或者如果重試耗盡,則請求失敗。 | int | 30000 | high |
socket.receive.buffer.bytes | server端用來處理socket連線的SO_RCVBUFF緩衝大小。如果值為-1,則將使用作業系統預設值。 | int | 102400 | high |
socket.request.max.bytes | server能接受的請求的最大的大小,這是為了防止server跑光記憶體,不能大於Java堆的大小。 | int | 104857600 | high |
socket.send.buffer.bytes | server端用來處理socket連線的SO_SNDBUFF緩衝大小。如果值為-1,則將使用作業系統預設值。 | int | 102400 | high |
transaction.max.timeout.ms | 事務的最大允許超時時間。 如果客戶請求的事務時間超過這個時間,那麼broker將在InitProducerIdRequest中返回一個錯誤。 這樣可以防止客戶超時時間過長,從而阻礙消費者讀取事務中包含的主題。 | int | 900000 | high |
transaction.state.log.load.buffer.size | 將生產者ID和事務載入到快取中時,從事務日誌段(the transaction log segments)讀取的批量大小。 | int | 5242880 | high |
transaction.state.log.min.isr | 覆蓋事務主題的min.insync.replicas配置。 | int | 2 | high |
transaction.state.log.num.partitions | 事務主題的分割槽數量(部署後不應更改)。 | int | 50 | high |
transaction.state.log.replication.factor | 事務主題的複製因子(設定更高以確保可用性)。 內部主題建立將失敗,直到群集大小滿足此複製因素要求。 | short | 3 | high |
transaction.state.log.segment.bytes | 事務主題段位元組應保持相對較小,以便於更快的日誌壓縮和快取負載 | int | 104857600 | high |
transactional.id.expiration.ms | 事務協調器在未收到任何事務狀態更新之前,主動設定生產者的事務標識為過期之前將等待的最長時間(以毫秒為單位)。 | int | 604800000 | high |
unclean.leader.election.enable | 指明瞭是否能夠使不在ISR中replicas follower設定用來作為leader | boolean | false | high |
zookeeper.connection.timeout.ms | 連線到ZK server的超時時間,沒有配置就使用zookeeper.session.timeout.ms | int | null | high |
zookeeper.session.timeout.ms | ZooKeeper的session的超時時間,如果在這段時間內沒有收到ZK的心跳,則會被認為該Kafka server掛掉了。如果把這個值設定得過低可能被誤認為掛掉,如果設定得過高,如果真的掛了,則需要很長時間才能被server得知。 | int | 6000 | high |
zookeeper.set.acl | 連線zookeeper是否使用 ACLs安全驗證 | boolean | false | high |
broker.id.generation.enable | 伺服器是否允許自動生成broker.id;如果允許則產生的值會交由reserved.broker.max.id稽核 | boolean | true | medium |
broker.rack | broker的機架位置。 這將在機架感知複製分配中用於容錯。 例如:RACK1 ,us-east-1d |
string | null | medium |
connections.max.idle.ms | 空閒連線超時:伺服器套接字處理器執行緒關閉閒置超過此的連線 | long | 600000 | medium |
controlled.shutdown.enable | 是否允許控制器關閉broker ,若是設定為true,會關閉在這個broker上所有分割槽的leader,並轉移到其他broker,這會降低在關閉期間不可用的時間。 | boolean | true | medium |
controlled.shutdown.max.retries | 控制器在關閉時可能有多種原因導致失敗,可以重新關閉的次數。 | int | 3 | medium |
controlled.shutdown.retry.backoff.ms | 在每次重新關閉前,系統需要一定的時間去恢復發生錯誤之前的狀態,這個就是在重試期間的回退(backoff)時間。 | long | 5000 | medium |
controller.socket.timeout.ms | 控制器到broker通道的socket超時時間 | int | 30000 | medium |
default.replication.factor | 自動建立topic時的預設副本的個數 | int | 1 | medium |
delete.records.purgatory.purge.interval.requests | 詳見註解 | int | 1 | medium |
fetch.purgatory.purge.interval.requests | 提取清除請求的清除間隔(請求數)詳見註解 | int | 1000 | medium |
producer.purgatory.purge.interval.requests | The purge interval (in number of requests) of the producer request purgatory詳見註解 | int | 1000 | medium |
group.initial.rebalance.delay.ms | 在執行第一次再平衡之前,group協調員將等待更多消費者加入group的時間。 延遲時間越長意味著重新平衡的可能性越小,但是等待處理開始的時間增加。 | int | 3000 | medium |
group.max.session.timeout.ms | 消費者向組內註冊時允許的最大超時時間,超過這個時間表示註冊失敗。更長的超時使消費者有更多的時間來處理心跳之間的訊息,代價是檢測失敗的時間更長。 | int | 300000 | medium |
group.min.session.timeout.ms | 消費者向組內註冊時允許的最小超時時間,更短的超時以更頻繁的消費者心跳為代價但有更快速的故障檢測,這可能影響broker資源。 | int | 6000 | medium |
inter.broker.listener.name | 用於經紀人之間溝通的監聽者名稱。如果未設定,則偵聽器名稱由security.inter.broker.protocol定義。 同時設定此和security.inter.broker.protocol屬性是錯誤的。 | string | null | medium |
inter.broker.protocol.version | 指定將使用哪個版本的 inter-broker 協議。 在所有經紀人升級到新版本之後,這通常會受到衝擊。升級時要設定 | string | 0.11.0-IV2 | medium |
log.cleaner.backoff.ms | 檢查log是否需要clean的時間間隔。 | long | 15000 | medium |
log.cleaner.dedupe.buffer.size | 日誌壓縮去重時候的快取空間,在空間允許的情況下,越大越好。 | long | 134217728 | medium |
log.cleaner.delete.retention.ms | 對於壓縮的日誌保留的最長時間,也是客戶端消費訊息的最長時間,同log.retention.minutes的區別在於一個控制未壓縮資料,一個控制壓縮後的資料。 | long | 86400000 | medium |
log.cleaner.enable | 啟用日誌清理器程序在伺服器上執行。使用了cleanup.policy = compact的主題,包括內部offsets主題,都應該啟動該選項。如果被禁用的話,這些話題將不會被壓縮,並且會不斷增長。 | boolean | true | medium |
log.cleaner.io.buffer.load.factor | 日誌清理中hash表的擴大因子,一般不需要修改。 | double | 0.9 | medium |
log.cleaner.io.buffer.size | 日誌清理時候用到的I/O塊(chunk)大小,一般不需要修改。 | int | 524288 | medium |
log.cleaner.io.max.bytes.per.second | 在執行log compaction的過程中,限制了cleaner每秒鐘I/O的資料量,以免cleaner影響正在執行的請求。 | double | 1. | medium |
log.cleaner.min.cleanable.ratio | 控制了log compactor進行clean操作的頻率。預設情況下,當log的50%以上已被clean時,就不用繼續clean了。此配置可以被覆蓋。 | double | 0.5 | medium |
log.cleaner.min.compaction.lag.ms | 訊息在日誌中保持未壓縮的最短時間。 僅適用於正在壓縮的日誌。 | long | 0 | medium |
log.cleaner.threads | 用於日誌清理的後臺執行緒的數量 | int | 1 | medium |
log.cleanup.policy | 此配置可以設定成delete或compact。如果設定為delete,當log segment檔案的大小達到上限,或者roll時間達到上限,檔案將會被刪除。如果設定成compact,則此檔案會被清理,標記成已過時狀態,詳見 4.8 。此配置可以被覆蓋。 | list | delete | medium |
log.index.interval.bytes | 當執行一個fetch操作後,需要一定的空間來掃描最近的offset大小,設定越大,代表掃描速度越快,但是也更耗記憶體,一般情況下不需要改變這個引數。 | int | 4096 | medium |
log.index.size.max.bytes | 每個log segment的最大尺寸。注意,如果log尺寸達到這個數值,即使尺寸沒有超過log.segment.bytes限制,也需要產生新的log segment。 | int | 10485760 | medium |
log.message.format.version | 指定broker將用於將訊息新增到日誌檔案的訊息格式版本。 該值應該是有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0。 通過設定特定的訊息格式版本,使用者保證磁碟上的所有現有訊息都小於或等於指定的版本。 不正確地設定這個值將導致使用舊版本的使用者出錯,因為他們將接收到他們不理解的格式的訊息。 | string | 0.11.0-IV2 | medium |
log.message.timestamp.difference.max.ms | broker收到訊息時的時間戳和訊息中指定的時間戳之間允許的最大差異。 如果log.message.timestamp.type = CreateTime,如果時間戳的差值超過此閾值,則會拒絕接受這條訊息。 如果log.message.timestamp.type = LogAppendTime,則忽略此配置。允許的最大時間戳差異不應大於log.retention.ms,以避免不必要地頻繁進行日誌滾動。 | long | 9223372036854775807 | medium |
log.message.timestamp.type | 定義訊息中的時間戳是訊息建立時間還是日誌追加時間。 該值應該是“CreateTime”或“LogAppendTime” | string | CreateTime | medium |
log.preallocate | 是否預建立新的段檔案,windows推薦使用 | boolean | false | medium |
log.retention.check.interval.ms | 檢查日誌段檔案的間隔時間,以確定是否檔案屬性是否到達刪除要求。 | long | 300000 | medium |
max.connections.per.ip | broker上每個IP允許最大的連線數 | int | 2147483647 | medium |
max.connections.per.ip.overrides | 每個ip或者hostname預設的連線的最大覆蓋 | String | “” | medium |
num.partitions | 新建Topic時預設的分割槽數 | int | 1 | medium |
principal.builder.class | The fully qualified name of a class that implements the PrincipalBuilder interface, which is currently used to build the Principal for connections with the SSL SecurityProtocol. | class | org.apache. kafka.common. security.auth. DefaultPrincipalBuilder |
medium |
replica.fetch.backoff.ms | 複製資料時失敗等待時間 | int 1000 | medium | |
replica.fetch.max.bytes | 為每個分割槽設定獲取的訊息的位元組數。 這不是絕對最大值,如果第一個非空分割槽中的第一個record batch大於此值,那麼record batch仍將被返回以確保可以進行。 代理接受的最大記錄批量大小通過message.max.bytes(broker config)或max.message.bytes(topic config)進行定義。 | int | 1048576 | medium |