1. 程式人生 > 實用技巧 >kafka-環境搭建與簡單入門

kafka-環境搭建與簡單入門

Kafka環境搭建與簡單入門 步驟一:下載程式碼 下載Kafka安裝包,並解壓。 這裡下載的版本為kafka_2.11-1.0.0.tgz。

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0

步驟二:啟動服務 Kafka使用Zookeeper儲存元資料,如果你還沒有Zookeeper伺服器,你需要先啟動一個Zookeeper伺服器。

可以使用與Kafka打包在一起的便捷指令碼來快速簡單地建立一個單節點的Zookeeper例項

> bin/zookeeper-server-start.sh config/zookeeper.properties

啟動Zookeeper服務後,現在啟動Kafka服務。

> bin/kafka-server-start.sh config/server.properties

注:如果要使用後臺啟動,可以使用:

> bin/kafka-server-start.sh -daemon config/server.properties

步驟三:建立Topic 下面我們建立一個名為"test"的Topic,它只有一個分割槽和一個副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

現在我們可以使用–list引數來檢視這個Topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

或者,您也可以將Kafka配置為:在讀取或寫入訊息時,如果Topic不存在,自動建立Topic,而不是手動建立。 步驟四:傳送一些訊息 Kafka自帶一個命令列客戶端,可以從檔案或標準輸入中獲取輸入,並將其作為message傳送到Kafka叢集。

預設情況下,每行將作為單獨的message傳送。

執行Producer,然後在控制檯輸入一些訊息已傳送到服務端。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

步驟五:啟動一個Consumer Kafka還提供各類命令列的Consumer,將Topic中的訊息輸出到控制檯。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

建議您使用兩個不同的終端分別執行Producer和Consumer程式,這樣您在Producer端輸入的訊息,會立即在Consumer端顯示出來。

所有的命令列工具都有其他選項,執行不帶任何引數的命令將顯示更加資訊的使用資訊。

步驟六:加入更多節點 到目前為止,我們的Kafka叢集只有一個節點,為了深入瞭解它,我們把叢集擴充套件到三個節點(仍然在本地機器上,只不過是啟動更多的例項而已)。

首先,為Broker例項建立配置檔案:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

然後編輯這些檔案並設定如下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id屬性是叢集中每個節點的編號,必須保證唯一。此外,我們必須指定其他例項的埠和日誌目錄,因為我們在同一臺機器上執行,否則會出現埠衝突或資料覆蓋的問題。

我們前面已經啟動了一個例項,現在將兩個新的例項進行啟動。

> bin/kafka-server-start.sh -daemon config/server-1.properties
> bin/kafka-server-start.sh -daemon config/server-2.properties

現在建立一個副本為3的新Topic:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

現在我們檢視這個Topic的詳情資訊:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

以下是對輸出資訊的解釋。第一行給出了所有分割槽的摘要,下面的每行都給出了一個分割槽的資訊。因為我們只有一個分割槽,所以只有一行。

"leader"是負責給定分割槽所有讀寫操作的節點。每個節點都是部分分割槽的領導者。 "replicas"是複製分割槽日誌的節點列表,不管這些是leader還是還是其他節點。 "isr"是一組"同步"的replicas,是replicas列表的子集,是指該分割槽對應的正在同步中的節點。 注意,在示例中,節點1是該Topic中唯一的分割槽leader。

我們可以檢視最開始建立的名為test的Topic詳情:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

和預想中的一樣,原來的Topic只有1個副本並且在編號為0的節點上,因為我們建立該Topic時,叢集中當時只有一個節點。

現在先新建立的Topic中傳送一些資訊:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

消費該Topic中的訊息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

讓我們來測試一下容錯性。Broker 1是my-replicated-topic的Leader節點,讓我們來結束它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

再次檢視my-replicated-topic的詳情,發現leader自動切換為另一個節點,並且節點1也不在ISR列表中。

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0