1. 程式人生 > >Kafka 單節點單Kafka Broker叢集

Kafka 單節點單Kafka Broker叢集

下載與安裝

$ tar zxvf kafka_2.12-0.10.2.1.tgz
$ cd kafka_2.12-0.10.2.1

執行

啟動 zookeeper 服務

$ bin/zookeeper-server-start.sh config/zookeeper.properties

啟動 kafka Broker 服務

$ bin/kafka-server-start.sh config/server.properties

測試

首先,建立一個單分割槽單副本的 topic: mytopic

$ bin/kafka-topics.sh --create -
-zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic Created topic "mytopic".

然後,可以通過執行 list 命令來檢視已經存在的 topic,比如:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
mytopic

也可以使用 describe 命令來檢視。由於我們現在是單分割槽單副本的case,所以 Leader 和 Isr (複製節點集合)都只在Broker 0上。

bin/kafka-topics
.sh --describe --zookeeper localhost:2181 --topic mytopic Topic:mytopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: mytopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

現在,我們通過 Kafka 自帶命令列客戶端向這個 topic 傳送訊息。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
aaa
bbb
ccc

另開一個終端,然後使用 Kafka 自帶的命令列工具來消費這些訊息。

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
...
aaa
bbb
ccc

此時可以在 producer 命令列視窗繼續輸入訊息,然後觀察 consumer 終端視窗,可以看到它們被消費打印出來。

使用 Kafka Connect 匯入匯出資料

下面使用 Kafka 的 Connect 演示從一個變化的檔案中讀取增量資料然後輸出到另外一個檔案中。

首先執行下面的指令碼,此指令碼會每隔一秒會向 test.txt 檔案中追加一個數字。

$ for i in {1..300};do echo $i >> test.txt; sleep 1; done

然後執行下面的指令碼,此時生產者從 test.txt 檔案中讀取檔案內容並作為訊息傳送到topic中,然後消費者從topic中消費訊息並輸出到 test.sink.txt 檔案中。命令列使用的配置檔案中定義了輸入輸出的檔名和使用的topic名。

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

執行後,另開一個終端來觀察 test.sink.txt 檔案中的內容,可以看到檔案中的內容會不停的增加。

刪除 Topic

要刪除 topic,可以使用下面的命令

$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytopic

但是執行命令後,topic並沒有被刪除,使用 “bin/kafka-topics.sh –list –zookeeper localhost:2181” 仍然可以查到。此時我們需要修改config/server.properties檔案中的 “delete.topic.enable=true” 來開啟這個功能。此時我們再執行上面的 –delete 操作,即可刪除topic了。