1. 程式人生 > >Kafka Broker配置

Kafka Broker配置

  Kafka發行包裡自帶的配置樣本可以用來安裝單機服務,但並不能滿足大多數安裝場景的要求。kafka有很多配置選項,Kafka有很多配置選項,涉及安裝和調優的方方面面。不過大多數調優選項可以使用預設配置,除非你對調優有特別的要求。

常規配置和主題配置(服務端的配置檔案)

  kafka安裝包裡自帶的config目錄下有一個名字叫做server.properties的配置檔案,這裡面的配置項對應的就是標題裡說的常規配置和主題配置了。在我的試驗環境下,這個檔案的絕對路徑是:/usr/local/kafka/kafka_2.11-2.0.0/config/server.properties。

#每一個broker都需要一個識別符號,使用broker.id來表示。它的預設值是0,也可以被設定成任意其它整數。這個值在整個kafka叢集裡必須是唯一的。這個值可以任意選定。建議把他們設定成語機器名具有相關性的整數,這樣在維護時,將ID號對映到機器名就沒有那麼麻煩了。
broker.id
=0 #用來監聽連線的埠,producer或consumer將在此埠建立連線,修改port可以配置引數可以把它設定成其他任意可用的埠,如果要使用1024以下的埠,需要使用root許可權啟動kafka,不推薦這麼做 port=9092

#用於儲存broker元資料的Zookeeper地址是通過zookeeper.connect來指定的,localhost:2181表示這個Zookeeper是執行在本地的2181埠上。該引數配置是用逗號(原書寫的是冒號,但是kafka自帶的配置檔案是用逗號分隔做的示例)分隔的一組hostname:port/path列表,
#每一部分的含義如下:hostname是Zookeeper伺服器的機器名或IP地址;
#port是Zookeeper的客戶端連線埠
#/path是可選的Zookeeper路徑,作為Kafka叢集的chroot環境,如果不指定,預設使用根目錄,如果指定的chroot路徑不存在,broker會在啟動的時候建立它。
#chroot在kafka叢集裡使用chroot路徑是一種最佳實踐。Zookeeper群組可以共享給其它應用程式,即使還有其它kafka叢集存在,也不會產生衝突。最好是在配置檔案裡指定一組Zookeeper伺服器
#用逗號(原書《kafka權威指南》寫的是分號)把他們隔開,一旦有一個Zookeeper伺服器宕機,broker可以連線到Zookeeper群組的另一個結點上。

zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#kafka把所有的訊息都儲存在磁碟上,存放這些日誌片段的目錄是通過log.dirs指定的。它是一組用逗號分隔的本地檔案系統路徑。如果指定了多個路徑,那麼Broker會根據
#“最少使用”原則,把同一個分割槽的日誌片段儲存到同一個路徑下。要注意,broker會往擁有最好數目分割槽的路徑新增分割槽,而不是往用於最小磁碟空間的路徑新增分割槽。 log.dirs=/export/servers/logs/kafka

#我們知道segment檔案預設會被保留7天的時間,超時的話就
#會被清理,那麼清理這件事情就需要有一些執行緒來做。這裡就是
#用來設定恢復和清理data下資料的執行緒數量
#對以下3種情況,kafka會使用可配置的執行緒池來處理日誌片段
#伺服器正常啟動,用於開啟每個分割槽的日誌片段;
#伺服器崩潰後重啟,用於檢查和截短每個分割槽的日誌片段;
#伺服器正常關閉,用於關閉日誌片段
#預設情況下,每個日誌目錄只使用一個執行緒,因為這些執行緒只是在伺服器啟動和關閉時會用到,所以完全可以設定大量的執行緒來達到並行操作的目的。特別是對於包含大量分割槽的伺服器來說
#一旦發生崩潰,在進行恢復時使用並行操作可能會省下數小時的時間。設定此引數時需要注意,所配置的數字對應的是log.dirs指定的單個目錄,也就是說該項配置為8,並且log.dirs被
#指定了3個路徑,那麼總共需要24個執行緒 num.recovery.threads.per.data.dir=1
#預設情況下,kafka會在如下情況下自動建立主題:
#當一個生產者開始往主題寫入訊息時
#當一個消費者開始從主題讀取訊息時
#當任意一個客戶端向主題傳送元資料請求時
#很多時候,這些行為都是非預期的,而且,根據kafka協議,如果一個主題不先被建立,根本無法知道它是否已經存在。如果顯示的建立主題,不管是手動建立還是通過其它配置系統來建立
#都可以把auto.create.topics.enable設為false
auto.create.topics.enable=true

#以上為常規配置

#num.partitions引數指定了新建立的主題將包含多少個分割槽。如果啟用了主題自動建立功能(該功能是預設啟用的),主題分割槽的個數就是該引數指定的值。
#該引數的預設值是1。注意,我們可以增加主題分割槽的個數,但不能減少分割槽的個數。所以,如果要讓一個主題的分割槽數個數少於num.partitions指定的值,需要手動建立該主題。
#kafka叢集通過分割槽對主題進行橫向擴充套件,所以當有新的Broker加入叢集時,可以通過分割槽的個數來實現叢集負載的均衡。為了能讓分割槽分佈到所有broker上,主題分割槽的個數必須要大於Broker的個數
#擁有大量訊息的主題如果要進行負載均衡,就需要大量的分割槽。分割槽的大小限制在25GB以內可以得到比較理想的效果
#分割槽數量的估算可以通過主題每秒鐘的吞吐量/消費者每秒鐘的吞吐量來進行估算 num.partitions=2
#segment檔案保留的最長時間,預設保留7天(168小時),
#超時將被刪除,也就是說7天之前的資料將被清理掉。
#Kafka可以根據時間來決定資料可以被保留多久。預設使用log.retention.hours引數來配置時間,預設是168小時。
#除此之外還有其它兩個引數log.retention.minutes和log.retention.ms。使用log.retention.ms為宜
#如果指定了不知一個引數,Kafka會優先使用具有最小值的那個引數
#根據時間保留資料是通過檢查磁碟上日誌片段檔案的最後修改時間來實現的。一般來說,最後修改時間指的就是日誌片段關閉的時間,也就是檔案裡最後一個訊息的時間戳。
#不過如果使用管理工具在伺服器間移動分割槽,最後修改時間就不準確了,時間誤差可能會導致這些分割槽過多的保留資料
log.retention.hours=168
#log.retention.bytes,這是另一種決定日誌片段是否被刪除的方式。(這兩種方式都是作用在日誌片段上的,而不是作用在單個訊息上)
#通過保留的訊息的位元組數來判斷是否過期。它的值通過log.retention.bytes來指定,作用在每一個分割槽上,就是說這個值被設定為了1024,一個主題有8個分割槽,那麼這個主題一共可以儲存8K資料
#如果同時指定了log.retention.bytes和log.retention.ms,只要任意一個條件滿足,訊息就會被刪除。
log.retention.bytes=1024

#日誌檔案中每個segment的大小,預設為1G
#當訊息到達broker時,它們被追加到分割槽的當前日誌片段上,當日志片段大小達到log.segment.bytes指定的上限時,當前日誌片段就會被關閉,一個新的日誌片段被開啟。
#如果一個新的日誌片段被關閉,就開始等待過期。這個引數的值越小,就會越頻繁的關閉和分配新檔案,從而降低磁碟寫入的整體效率。
#如果主題的訊息量不大,那麼怎麼調整這個引數就非常重要了。
#使用時間戳獲取偏移量:日誌片段大小會影響使用時間戳獲取便宜浪。在使用時間戳獲取日誌偏移量時,kafka會檢查分割槽裡最後修改時間大於指定時間戳的日誌片段(已經被關閉而定),
#該日誌片段的前一個檔案的最後修改時間小於指定時間戳。然後kafka返回該日誌片段(也就是檔名)開頭的偏移量。對於使用時間戳獲取偏移量的操作來說,日誌片段越小,結果越準確。 log.segment.bytes=1073741824
#控制日誌片段關閉的另一個引數是:log.segment.ms,它指定了多長時間之後日誌片段就會被關閉。log.segment.bytes和log.segment.bytes兩個引數,哪個條件先達到就使用哪個條件關閉日誌片段
#預設情況下log.segment.ms沒有設定值,所以只根據大小來關閉日誌片段。如果使用基於時間的日誌片段,要著重考慮並行關閉多個日誌片段對磁碟效能的影響。

#message.max.bytes,broker通過設定message.max.bytes引數來限制單個訊息的大小,預設值是1000000,也就是1MB,如果生產者嘗試傳送的訊息超過這個大小,不僅訊息不會被接收,
#還會收到broker返回的錯誤資訊。跟其它與位元組相關的配置引數一樣,該引數指的是壓縮後的訊息大小,也就是說,只要壓縮後訊息大小小於message.max.bytes指定的值,訊息的實際大小
#可以遠大於這個值。這個值對效能有著顯著的影響,值越大,那麼複雜處理網路連線和請求得到執行緒就需要花越多的時間處理這些請求。它還會增加磁碟寫入塊的大小,從而影響IO吞吐量。
#在服務端和客戶端之間協調訊息大小的配置
#消費者客戶端設定的fetch.message.max.bytes必須與伺服器端設定的訊息大小進行協調。如果這個值比message.max.bytes小,那麼消費者就無法讀取比較大的訊息,導致出現消費者被
#阻塞的情況。在為叢集裡的broker配置replica.fetch.max.bytes引數時也需要遵守同樣的規則。
message.max.bytes=1000000

#以上為主題的預設配置
#處理網路請求的執行緒數量,也就是接收訊息的執行緒數。
#接收執行緒會將接收到的訊息放到記憶體中,然後再從記憶體中寫入磁碟。
num.network.threads=3

#訊息從記憶體中寫入磁碟是時候使用的執行緒數量。
#用來處理磁碟IO的執行緒數量
num.io.threads=8

#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400

#接受套接字的緩衝區大小
socket.receive.buffer.bytes=102400

#請求套接字的緩衝區大小
socket.request.max.bytes=104857600

#滾動生成新的segment檔案的最大時間
log.roll.hours=168

#上面的引數設定了每一個segment檔案的大小是1G,那麼
#就需要有一個東西去定期檢查segment檔案有沒有達到1G,
#多長時間去檢查一次,就需要設定一個週期性檢查檔案大小
#的時間(單位是毫秒)。
log.retention.check.interval.ms=300000

#日誌清理是否開啟
log.cleaner.enable=true
#zookeeper連結超時時間
zookeeper.connection.timeout.ms=6000

#上面我們說過接收執行緒會將接收到的訊息放到記憶體中,然後再從記憶體
#寫到磁碟上,那麼什麼時候將訊息從記憶體中寫入磁碟,就有一個
#時間限制(時間閾值)和一個數量限制(數量閾值),這裡設定的是
#數量閾值,下一個引數設定的則是時間閾值。
#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟。
log.flush.interval.messages=10000

#訊息buffer的時間,達到閾值,將觸發將訊息從記憶體flush到磁碟,
#單位是毫秒。
log.flush.interval.ms=3000

#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true

#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:
#Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01

advertised.host.name=192.168.239.128