1. 程式人生 > 其它 >kafka搭建命令與使用

kafka搭建命令與使用

安裝前的環境準備

由於Kafka是用Scala語言開發的,執行在JVM上,因此在安裝Kafka之前需要先安裝JDK。

# yum install java-1.8.0-openjdk* -y

kafka依賴zookeeper,所以需要先安裝zookeeper

# wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
# tar -zxvf zookeeper-3.4.12.tar.gz
# cd zookeeper-3.4.12
# cp conf/zoo_sample.cfg conf/zoo.cfg
啟動zookeeper
# bin/zkServer.sh start
# bin/zkCli.sh 
# ls /			#檢視zk的根目錄相關節點

第一步:下載安裝包

下載1.1.0 release版本,並解壓:

# wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
# tar -xzf kafka_2.11-1.1.0.tgz
# cd kafka_2.11-1.1.0

第二步:啟動服務

現在來啟動kafka服務:
啟動指令碼語法:

kafka-server-start.sh [-daemon] server.properties

可以看到,server.properties的配置路徑是一個強制的引數,-daemon表示以後臺程序執行,否則ssh客戶端退出後,就會停止服務。(注意,在啟動kafka時會使用linux主機名關聯的ip地址,所以需要把主機名和linux的ip對映配置到本地host裡,用vim /etc/hosts)

# bin/kafka-server-start.sh -daemon config/server.properties

我們進入zookeeper目錄通過zookeeper客戶端檢視下zookeeper的目錄樹

# bin/zkCli.sh 
# ls /			#檢視zk的根目錄kafka相關節點
# ls /brokers/ids	#檢視kafka節點

第三步:建立主題

現在我們來建立一個名字為“test”的Topic,這個topic只有一個partition,並且備份因子也設定為1:

# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

現在我們可以通過以下命令來檢視kafka中目前存在的topic

# bin/kafka-topics.sh --list --zookeeper localhost:2181

除了我們通過手工的方式建立Topic,我們可以配置broker,當producer釋出一個訊息某個指定的Topic,但是這個Topic並不存在時,就自動建立。

第四步:傳送訊息

kafka自帶了一個producer命令客戶端,可以從本地檔案中讀取內容,或者我們也可以以命令列中直接輸入內容,並將這些內容以訊息的形式傳送到kafka叢集中。在預設情況下,每一個行會被當做成一個獨立的訊息。
首先我們要執行釋出訊息的指令碼,然後在命令中輸入要傳送的訊息的內容:

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a msg
>this is a another msg

第五步:消費訊息

對於consumer,kafka同樣也攜帶了一個命令列客戶端,會將獲取到內容在命令中進行輸出:

# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test   --from-beginning #老版本
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --consumer-property client.id=consumer-1  --topic test    #新版本

如果你是通過不同的終端視窗來執行以上的命令,你將會看到在producer終端輸入的內容,很快就會在consumer的終端視窗上顯示出來。
以上所有的命令都有一些附加的選項;當我們不攜帶任何引數執行命令的時候,將會顯示出這個命令的詳細用法。

其他常用指令

檢視組名

#  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

檢視消費者的消費偏移量

# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

消費多主題

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test-2"

單播消費

一條訊息只能被某一個消費者消費的模式,類似queue模式,只需讓所有消費者在同一個消費組裡即可
分別在兩個客戶端執行如下消費命令,然後往主題裡傳送訊息,結果只有一個客戶端能收到訊息

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --topic test

多播消費

一條訊息能被多個消費者消費的模式,類似publish-subscribe模式費,針對Kafka同一條訊息只能被同一個消費組下的某一個消費者消費的特性,要實現多播只要保證這些消費者屬於不同的消費組即可。我們再增加一個消費者,該消費者屬於testGroup-2消費組,結果兩個客戶端都能收到訊息

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup-2 --topic test

kafka叢集配置

到目前為止,我們都是在一個單節點上執行broker,這並沒有什麼意思。對於kafka來說,一個單獨的broker意味著kafka叢集中只有一個接點。要想增加kafka叢集中的節點數量,只需要多啟動幾個broker例項即可。為了有更好的理解,現在我們在一臺機器上同時啟動三個broker例項。
首先,我們需要建立好其他2個broker的配置檔案:

# cp config/server.properties config/server-1.properties
# cp config/server.properties config/server-2.properties

配置檔案的內容分別如下:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

broker.id屬性在kafka叢集中必須要是唯一的。我們需要重新指定port和log目錄,因為我們是在同一臺機器上執行多個例項。如果不進行修改的話,consumer只能獲取到一個instance例項的資訊,或者是相互之間的資料會被影響。
目前我們已經有一個zookeeper例項和一個broker例項在運行了,現在我們只需要在啟動2個broker例項即可:

# bin/kafka-server-start.sh -daemon config/server-1.properties
# bin/kafka-server-start.sh -daemon config/server-2.properties
### 設定備份因子

現在我們建立一個新的topic,備份因子設定為3:

# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

檢視topic資訊

現在我們已經有了叢集,並且建立了一個3個備份因子的topic,但是到底是哪一個broker在為這個topic提供服務呢(因為我們只有一個分割槽,所以肯定同時只有一個broker在處理這個topic

# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

topic資訊解析

以下是輸出內容的解釋

第一行是所有分割槽的概要資訊,之後的每一行表示每一個partition的資訊。因為目前我們只有一個partition,因此關於partition的資訊只有一行。

  • leader節點負責給定partition的所有讀寫請求。
  • replicas 表示某個partition在哪幾個broker上存在備份。不管這個幾點是不是”leader“,甚至這個節點掛了,也會列出。
  • isr 是replicas的一個子集,它只列出當前還存活著的,並且備份了該partition的節點。
    現在我們的案例中,0號節點是leader,即使用server.properties啟動的那個程序。
    我們可以執行相同的命令檢視之前建立的名稱為”test“的topic
# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

之前設定了topic的partition數量為1,備份因子為1,因此顯示就如上所示了。
現在我們向新建的topic中傳送一些message:

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test msg 1
my test msg 2
現在開始消費:

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

my test msg 1
my test msg 2

現在我們來測試我們容錯性,因為broker0目前是leader,所以我們要將其kill

# ps -ef | grep server.properties
# kill -9 1177

現在再執行命令:

# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

我們可以看到,leader節點已經變成了broker 2

要注意的是,在Isr中,已經沒有了0號節點。leader的選舉也是從ISR(in-sync replica)中進行的。
此時,我們依然可以 消費新訊息:

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2

檢視主題分割槽對應的leader資訊: