1. 程式人生 > >kafka broker的常用配置

kafka broker的常用配置

Broker的一般配置

有很多引數在部署叢集模式時需要引起重視,這些引數都是broker最基本的配置,很多引數都需要依據叢集的broker情況而變化。

broker.id

每個kafka的broker都需要有一個整型的唯一標識,這個標識通過broker.id來設定。預設的情況下,這個數字是0,但是它可以設定成任何值。需要注意的是,需要保證叢集中這個id是唯一的。這個值是可以任意填寫的,並且可以在必要的時候從broker叢集中刪除。比較好的做法是使用主機名相關的標識來做為id,比如,你的主機名當中有數字相關的資訊,如hosts1.example.com,host2.example.com,那麼這個數字就可以用來作為broker.id的值。

port

預設啟動kafka時,監聽的是TCP的9092埠,埠號可以被任意修改。如果埠號設定為小於1024,那麼kafka需要以root身份啟動。但是並不推薦以root身份啟動。

zookeeper.connect

這個引數指定了Zookeeper所在的地址,它儲存了broker的元資訊。在前一章節的例子 中,Zookeeper是執行在本機的2181埠上,因此這個值被設定成localhost:2181。這個值可以通過分號設定多個值,每個值的格式都是hostname:port/path,其中每個部分的含義如下:

  • hostname是zookeeper伺服器的主機名或者ip地址
  • port是伺服器監聽連線的埠號
  • /path是kafka在zookeeper上的根目錄。如果預設,會使用根目錄。

如果設定了chroot,但是它又不存在,那麼broker會在啟動的時候直接建立。

PS:為什麼使用Chroot路徑

一個普遍認同的最佳實踐就是kafka叢集使用chroot路徑,這樣zookeeper可以與其他應用共享使用,而不會有任何衝突。指定多個zookeeper伺服器的地址也是比較好的做法,這樣當zookeeper叢集中有節點失敗的時候,還可以正常連線其他的節點。

log.dirs

這個引數用於配置Kafka儲存資料的位置,Kafka中所有的訊息都會存在這個目錄下。可以通過逗號來指定多個目錄,kafka會根據最少被使用的原則選擇目錄分配新的parition。注意kafka在分配parition的時候選擇的規則不是按照磁碟的空間大小來定的,而是分配的parition的個數多小。

num.recovery.thread.per.data.dir

kafka可以配置一個執行緒池,執行緒池的使用場景如下:

  • 當正常啟動的時候,開啟每個parition的文件塊segment
  • 當失敗後重啟時,檢查parition的文件塊
  • 當關閉kafka的時候,清除關閉文件塊

預設,每個目錄只有一個執行緒。最好是設定多個執行緒數,這樣在伺服器啟動或者關閉的時候,都可以並行的進行操作。尤其是當非正常停機後,重啟時,如果有大量的分割槽數,那麼啟動broker將會花費大量的時間。注意,這個引數是針對每個目錄的。比如,num.recovery.threads.per.data.dir設定為8,如果有3個log.dirs路徑,那麼一共會有24個執行緒。

auto.create.topics.enable

在下面場景中,按照預設的配置,如果還沒有建立topic,kafka會在broker上自動建立topic:

  • 當producer向一個topic中寫入訊息時
  • 當cosumer開始從某個topic中讀取資料時
  • 當任何的客戶端請求某個topic的資訊時

在很多場景下,這都會引發莫名其妙的問題。尤其是沒有什麼辦法判斷某個topic是否存在,因為任何請求都會建立該topic。如果你想嚴格的控制topic的建立,那麼可以設定auto.create.topics.enable為false。

預設的主題配置Topic Defaults

kafka叢集在建立topic的時候會設定一些預設的配置,這些引數包括分割槽的個數、訊息的容錯機制,這些資訊都可以通過管理員工具以topic為單位進行配置。kafka為我們提供的預設配置,基本也能滿足大多數的應用場景了。

PS:使用per.topic進行引數的覆蓋

在之前的版本中,可以通過log.retention.hours.per.topic,log.retention.bytes.per.topic, log.segment.bytes.per.topic等覆蓋預設的配置。現在的版本不能這麼用了,必須通過管理員工具進行設定。

num.partitions

這個引數用於配置新建立的topic有多少個分割槽,預設是1個。注意partition的個數只可以被增加,不能被減少。這就意味著如果想要減少主題的分割槽數,那麼就需要重新建立topic。
在第一章中介紹過,kafka通過分割槽來對topic進行擴充套件,因此需要使用分割槽的個數來做負載均衡,如果新增了broker,那麼就會引發重新負載分配。這並不意味著所有的主題的分割槽數都需要大於broker的數量,因為kafka是支援多個主題的,其他的主題會使用其餘的broker。需要注意的是,如果訊息的吞吐量很高,那麼可以通過設定一個比較大的分割槽數,來分攤壓力。

log.retention.ms

這個引數用於配置kafka中訊息儲存的時間,也可以使用log.retention.hours,預設這個引數是168個小時,即一週。另外,還支援log.retention.minutes和log.retention.ms。這三個引數都會控制刪除過期資料的時間,推薦還是使用log.retention.ms。如果多個同時設定,那麼會選擇最小的那個。

PS:過期時間和最後修改時間

過期時間是通過每個log檔案的最後修改時間來定的。在正常的叢集操作中,這個時間其實就是log段檔案關閉的時間,它代表了最後一條訊息進入這個檔案的時間。然而,如果通過管理員工具,在brokers之間移動了分割槽,那麼這個時候會被重新整理,就不準確了。這就會導致本該過期刪除的檔案,被繼續保留了。

log.retention.bytes

這個引數也是用來配置訊息過期的,它會應用到每個分割槽,比如,你有一個主題,有8個分割槽,並且設定了log.retention.bytes為1G,那麼這個主題總共可以保留8G的資料。注意,所有的過期配置都會應用到patition粒度,而不是主題粒度。這也意味著,如果增加了主題的分割槽數,那麼主題所能保留的資料也就隨之增加了。

PS:通過大小和時間配置資料過期

如果設定了log.retention.bytes和log.retention.ms(或者其他過期時間的配置),只要滿足一個條件,訊息就會被刪除。比如,設定log.retention.ms是86400000(一天),並且log.retention.bytes是1000000000(1G),那麼只要修改時間大於一天或者資料量大於1G,這部分資料都會被刪除。

log.segment.bytes

這個引數用來控制log段檔案的大小,而不是訊息的大小。在kafka中,所有的訊息都會進入broker,然後以追加的方式追加到分割槽當前最新的segment段檔案中。一旦這個段檔案到達了log.segment.bytes設定的大小,比如預設的1G,這個段檔案就會被關閉,然後建立一個新的。一旦這個檔案被關閉,就可以理解成這個檔案已經過期了。這個引數設定的越小,那麼關閉檔案建立檔案的操作就會越頻繁,這樣也會造成大量的磁碟讀寫的開銷。
通過生產者傳送過來的訊息的情況可以判斷這個值的大小。比如,主題每天接收100M的訊息,並且log.segment.bytes為預設設定,那麼10天后,這個段檔案才會被填滿。由於段檔案在沒有關閉的時候,是不能刪除的,log.retention.ms又是預設的設定,那麼這個訊息將會在17天后,才過期刪除。因為10天后,段檔案才關閉。再過7天,這個檔案才算真正過期,才能被清除。

PS:根據時間戳追蹤offset

段檔案的大小也會影響到訊息消費offset的操作,因為讀取某一時間的offset時,kafka會尋找關閉時間晚於offset時間的那個段檔案。然後返回offset所在的段檔案的第一個訊息的offset,然後按照偏移值查詢目標的訊息。因此越小的段檔案,通過時間戳消費offset的時候就會越精確。

log.segment.ms

這個引數也可以控制段檔案關閉的時間,它定義了經過多長時間段檔案會被關閉。跟log.retention.bytes和log.retention.ms類似,log.segment.ms和log.segment.bytes也不是互斥的。kafka會在任何一個條件滿足時,關閉段檔案。預設情況下,是不會設定Log.segment.ms的,也就意味著只會通過段檔案的大小來關閉檔案。

PS:基於時間關閉段檔案的磁碟效能需求

當時使用基於時間的段檔案限制,對磁碟的要求會很高。這是因為,一般情況下如果段檔案大小這個條件不滿足,會按照時間限制來關閉檔案,此時如果分割槽數很多,主題很多,將會有大量的段檔案同時關閉,同時建立。

message.max.bytes

這個引數用於限制生產者訊息的大小,預設是1000000,也就是1M。生產者在傳送訊息給broker的時候,如果出錯,會嘗試重發;但是如果是因為大小的原因,那生產者是不會重發的。另外,broker上的訊息可以進行壓縮,這個引數可以使壓縮後的大小,這樣能多儲存很多訊息。
需要注意的是,允許傳送更大的訊息會對效能有很大影響。更大的訊息,就意味著broker在處理網路連線的時候需要更長的時間,它也會增加磁碟的寫操作壓力,影響IO吞吐量。

PS:配合訊息大小的設定

訊息大小的引數message.max.bytes一般都和消費者的引數fetch.message.max.bytes搭配使用。如果fetch.message.max.bytes小於message.max.bytes,那麼當消費者遇到很大的訊息時,將會無法消費這些訊息。同理,在配置cluster時replica.fetch.max.bytes也是一樣的道理。