Kafka-配置&命令
阿新 • • 發佈:2018-12-15
《Kafka權威指南》學習筆記
kafka安裝配置
broker配置
broker.id
:每個broker都需要有一個識別符號,使用broker.id來表示。它的預設值是0,也可以被設定成其他任意整數。這個值在整個Kafka叢集裡必須是唯一的。potr
:預設9092,可以指定任意埠號。(如果需要1024以下埠號,需要root許可權)broker.rack
:機架資訊。broker所在機架。zookeeper.connect
:hostname:port/path,hostname2:port/path...
,path可以不指定,預設為根路徑/
。log.dirs
:Kafka把所有訊息都儲存在磁碟上,存放這些日誌片段的目錄是通過log.dirs指定的。它是一組用逗號分隔的本地檔案系統路徑。
如果指定了多個路徑,那麼broker會根據“最少使用”原則,把同一個分割槽的日誌片段儲存到同一個路徑下。要注意,
broker會往擁有最少數目分割槽的路徑新增分割槽,而不是往擁有最小磁碟空間的路徑新增分割槽。
num.recovery.threads.per.data.dir
:對於如下3種情況,Kafka會使用可配置的錢程池來處理日誌片段- 伺服器正常啟動,用於開啟每個分割槽的日誌片段
- 伺服器崩憤後重啟,用於檢查和截短每個分割槽的日誌片段:
- 伺服器正常關閉,用於關閉日誌片段。
預設情況下,每個日誌目錄只使用一個執行緒。因為這些執行緒只是在伺服器啟動和關閉時會用到,所以完全可以設定大量的執行緒來達到井行操作的目的。特別是對於包含大量分割槽的伺服器來說,一旦發生奔潰,在進行恢復時使用井行操作可能會省下數小時的時間。設定此引數時需要注意,所配置的數字對應的是log.dirs指定的單個日誌目錄。也就是說,如果設定為8,井且
log.dir
指定了3個路徑,那麼總共需要24個執行緒。
auto.create.topics.enable
:Kafka會在如下幾種情形下自動建立主題:- 當一個生產者開始往主題寫入訊息時。
- 當一個消費者開始從主題讀取訊息時。
- 當任意一個客戶端向主題傳送元資料請求時。
主題的預設配置
-
num.partitions
:引數指定了新建立的主題將包含多少個分割槽。 -
log.retention.ms
:根據時間來決定資料可以被保留多久。預設使用log.retention.hours
引數來配置時間,預設值為168小時,也就是一週。 還有其他兩個引數:log.retention.minutes
和log.retention.ms
log.retention.ms
,若三個配置存在多項,則取值最小的那個。 -
log.retention.bytes
:根據保留訊息位元組數來判斷訊息是否過期,作用在每一個分割槽上。。 -
log.segment.bytes
和log.segment.ms
:控制日誌片段大小或存活時間。 -
message.max.bytes
:引數來限制單個訊息的大小預設值是l000000,也就是lMB。
Kafka命令
啟動服務
> ./kafka-server-start.sh -daemon ../config/server.properties
進入zk,檢視啟動的代理
>ls /brokers/ids
主題
#建立topic
> ./kafka-topics.sh --create --zookeeper s159:2181,s162:2181,s163:2181 --replication-factor 1 --partitions 1 --topic test
#列出topic
> ./kafka-topics.sh --list --zookeeper s159:2181,s162:2181,s163:2181
#描述主題
> ./kafka-topics.sh --describe --zookeeper s159:2181,s162:2181,s163:2181
#刪除主題,
a.必須配置 delete.topic.enable=true
b.> ./kafka-topics.sh --delete --zookeeper s159:2181,s162:2181,s163:2181 --topic test
進入zk,
c. > rmr /brokers/topics/{topic_name}';
> rmr /config/topics/{topic_name}';
批量刪除topic指令:
./kafka-topics.sh --list --zookeeper s159:2181,s162:2181,s163:2181|grep 'test*' | xargs -n1 ./kafka-topics.sh --delete --zookeeper s159:2181,s162:2181,s163:2181 --topic
訊息
#傳送訊息(Provider)
./kafka-console-producer.sh --broker-list s159:9092 --topic test
---host:port 指的是具體kafka服務
#消費訊息(Consumer)
> ./kafka-console-consumer.sh --bootstrap-server s159:9092 --topic test --from-beginning