1. 程式人生 > >Kafka-配置&命令

Kafka-配置&命令

《Kafka權威指南》學習筆記

kafka安裝配置

broker配置

  • broker.id:每個broker都需要有一個識別符號,使用broker.id來表示。它的預設值是0,也可以被設定成其他任意整數。這個值在整個Kafka叢集裡必須是唯一的。
  • potr:預設9092,可以指定任意埠號。(如果需要1024以下埠號,需要root許可權)
  • broker.rack:機架資訊。broker所在機架。
  • zookeeper.connecthostname:port/path,hostname2:port/path...,path可以不指定,預設為根路徑/
  • log.dirs:Kafka把所有訊息都儲存在磁碟上,存放這些日誌片段的目錄是通過log.dirs指定的。它是一組用逗號分隔的本地檔案系統路徑。

如果指定了多個路徑,那麼broker會根據“最少使用”原則,把同一個分割槽的日誌片段儲存到同一個路徑下。要注意,broker會往擁有最少數目分割槽的路徑新增分割槽,而不是往擁有最小磁碟空間的路徑新增分割槽。

  • num.recovery.threads.per.data.dir:對於如下3種情況,Kafka會使用可配置的錢程池來處理日誌片段
    • 伺服器正常啟動,用於開啟每個分割槽的日誌片段
    • 伺服器崩憤後重啟,用於檢查和截短每個分割槽的日誌片段:
    • 伺服器正常關閉,用於關閉日誌片段。

預設情況下,每個日誌目錄只使用一個執行緒。因為這些執行緒只是在伺服器啟動和關閉時會用到,所以完全可以設定大量的執行緒來達到井行操作的目的。特別是對於包含大量分割槽的伺服器來說,一旦發生奔潰,在進行恢復時使用井行操作可能會省下數小時的時間。設定此引數時需要注意,所配置的數字對應的是log.dirs指定的單個日誌目錄。也就是說,如果設定為8,井且log.dir

指定了3個路徑,那麼總共需要24個執行緒。

  • auto.create.topics.enable:Kafka會在如下幾種情形下自動建立主題:
    • 當一個生產者開始往主題寫入訊息時。
    • 當一個消費者開始從主題讀取訊息時。
    • 當任意一個客戶端向主題傳送元資料請求時。

主題的預設配置

  • num.partitions:引數指定了新建立的主題將包含多少個分割槽。

  • log.retention.ms:根據時間來決定資料可以被保留多久。預設使用log.retention.hours引數來配置時間,預設值為168小時,也就是一週。 還有其他兩個引數:log.retention.minuteslog.retention.ms

    ,推薦使用log.retention.ms,若三個配置存在多項,則取值最小的那個。

  • log.retention.bytes:根據保留訊息位元組數來判斷訊息是否過期,作用在每一個分割槽上。。

  • log.segment.byteslog.segment.ms:控制日誌片段大小或存活時間。

  • message.max.bytes:引數來限制單個訊息的大小預設值是l000000,也就是lMB。

Kafka命令

啟動服務

> ./kafka-server-start.sh  -daemon ../config/server.properties
進入zk,檢視啟動的代理
>ls /brokers/ids 

主題

#建立topic
> ./kafka-topics.sh --create --zookeeper s159:2181,s162:2181,s163:2181 --replication-factor 1 --partitions 1 --topic test

#列出topic
> ./kafka-topics.sh --list --zookeeper s159:2181,s162:2181,s163:2181

#描述主題
> ./kafka-topics.sh --describe --zookeeper s159:2181,s162:2181,s163:2181

#刪除主題,
a.必須配置 delete.topic.enable=true
b.> ./kafka-topics.sh --delete --zookeeper s159:2181,s162:2181,s163:2181 --topic test
進入zk, 
c. > rmr /brokers/topics/{topic_name}';
    > rmr /config/topics/{topic_name}';

批量刪除topic指令:

./kafka-topics.sh --list --zookeeper s159:2181,s162:2181,s163:2181|grep 'test*' | xargs -n1  ./kafka-topics.sh --delete --zookeeper s159:2181,s162:2181,s163:2181 --topic

訊息

#傳送訊息(Provider)
./kafka-console-producer.sh --broker-list s159:9092 --topic test
---host:port 指的是具體kafka服務

#消費訊息(Consumer)
> ./kafka-console-consumer.sh --bootstrap-server s159:9092 --topic test --from-beginning