Kafka(二): Kafka 叢集部署與使用
一、Kafka 叢集部署
Kafka是一種分散式的釋出(producer)/訂閱(consumer)的訊息系統,並支援實時和離線的資料處理、可擴充套件、持久的。上一次已經對kafka做了介紹,今天我們介紹如何部署、建立主題併發布訊息和訂閱訊息。
1、兩個虛擬機器:192.168.2.200、192.168.2.201
3、對兩個虛擬機器分別解壓
> tar -xzf kafka_2.12-0.10.2.1.tgz > cd kafka_2.12-0.10.2.1.tgz
4、配置 config/server.properties 檔案
broker.id=200 / broker.id=201
zookeeper.connect=192.168.2.200:2181,192.168.2.201:2181
broker.id:每一個broker在叢集中的唯一表示,要求是正數。
zookeeper.connect:zookeeper叢集的地址,可以是多個,多個之間用逗號分割。
5、Kafka依賴zookeeper,kafka的叢集的meta資訊儲存在zookeeper上,zookeeper管理consumer的
6、zookeeper叢集先啟動,然後啟動192.168.2.200、192.168.2.201虛擬機器上的kafka的server
>bin/kafka-server-start.sh config/server.properties &
建立主題併發布訊息和訂閱訊息
二、建立主題
建立了主題名my-test-topic,2個分割槽,一個副本
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic my-test-topic
主題資訊
1、檢視哪些主題列表
>bin/kafka-topics.sh --list --zookeeper localhost:2181
2、具體檢視某個主題的描述資訊
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic
第一行描述了my-test-topic主題的資訊、正如我們之前的建立的兩個分割槽、一個副本。接下去每一個描述對應的各個分割槽的情況,有多少個分割槽就有對應幾行。
Leader:負責處理訊息的讀和寫,Leader是從所有節點中隨機選擇的.
Replicas:分割槽對應的副本。
Isr:副本所在的server 節點。
三、釋出訊息和訂閱訊息
1、釋出訊息
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-test-topic
1)檢視釋出者
>./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-test-topic
my-test-topic:1:1
my-test-topic:0:1
2、訂閱訊息
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-test-topic --from-beginning
1)檢視訂閱描述資訊
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config group.id:test-consumer-group --topic my-test-topic --from-beginning
[2017-05-20 08:35:26,396] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/my-test-topic/0.
在0.9.0.版本後對kafka.tools.ConsumerOffsetChecker命令就不在支援了。
2)檢視消費組
>bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
3)檢視具體的消費者資訊
>bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group console-consumer-44096
4)我們在消費時指定消費組,config/consumer.properties配置的group.id
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/consumer.properties --topic my-test-topic --from-beginning
五、刪除主題
1、刪除Broker
>jps
>Kill -9 程序ID
2、刪除主題
>bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-test-topic