1. 程式人生 > 其它 >kafka快速開始

kafka快速開始

技術標籤:kafka

一、建立一個topic

建立一個名為“test1”的topic,它有一個分割槽和一個副本,進入windows路徑下,執行

kafka-topics.bat --create --zookeeper 192.168.175.104:2181 --replication-factor 1 --partitions 1 --topic test1

執行list(列表)命令來檢視這個topic

kafka-topics.bat --list --zookeeper 192.168.175.104:2181

使用kafka自帶的生產者和消費者客戶端測試時,不能使用localhost,須明確指定本機域名

在 server.properties,新增如下配置:

host.name = 192.168.111.130 # 本機ip地址

二、傳送一些訊息(生產者)

Kafka自帶一個命令列客戶端,它從檔案或標準輸入中獲取輸入,並將其作為message(訊息)傳送到Kafka叢集。預設情況下,每行將作為單獨的message傳送。

執行 producer,然後在控制檯輸入一些訊息以傳送到伺服器。

kafka-console-producer.bat --broker-list 192.168.175.104:9092 --topic test

三、接收訊息(消費者)

Kafka 還有一個命令列consumer(消費者),將訊息轉儲到標準輸出。

kafka-console-consumer.bat --bootstrap-server 192.168.175.104:9092 --topic test --from-beginning

將訊息輸入到生產者終端中,它們將在消費終端中顯示出來。生產者進行輸入,消費者中直接輸出。

四、設定多代理叢集

到目前為止,我們一直在使用單個代理,這並不好玩。對Kafka來說,單個代理只是一個大小為一的叢集,除了啟動更多的代理例項外,沒有什麼變化。為了深入瞭解它,讓我們把叢集擴充套件到三個節點(仍然在本地機器上)。

首先,為每個代理建立一個配置檔案:

E:\kafka_2.12-1.0.0\config>copy server.properties server-1.properties
已複製         1 個檔案。

E:\kafka_2.12-1.0.0\config>copy server.properties server-2.properties
已複製         1 個檔案。

現在編輯這些新檔案並設定如下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id屬性是叢集中每個節點的名稱,這一名稱是唯一且永久的。我們必須重寫埠和日誌目錄,因為我們在同一臺機器上執行這些,我們不希望所有的代理嘗試在同一個埠註冊,或者覆蓋彼此的資料。

我們已經建立Zookeeper和一個單節點了,現在我們只需要啟動兩個新的節點:

E:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server-1.properties
E:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server-2.properties

現在建立一個副本為3的新topic:

E:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --create --zookeeper 192.168.175.104:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Good,現在我們有一個叢集,但是我們怎麼才能知道那些代理在做什麼呢?執行"describe topics"命令來檢視:

E:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --describe --zookeeper 192.168.175.104:2181 --topic my-replicated-topic

以下是對輸出資訊的解釋。第一行給出了所有分割槽的摘要,下面的每行都給出了一個分割槽的資訊。因為我們只有一個分割槽,所以只有一行。

  • “leader”是負責給定分割槽所有讀寫操作的節點。每個節點都是隨機選擇的部分分割槽的領導者。

  • “replicas”是複製分割槽日誌的節點列表,不管這些節點是leader還是僅僅活著。

  • “isr”是一組“同步”replicas,是replicas列表的子集,它活著並被指到leader。

請注意,在示例中,節點1是該主題中唯一分割槽的領導者。

我們可以在已建立的原始主題上執行相同的命令來檢視它的位置:

E:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --describe --zookeeper 192.168.175.104:2181 --topic test

這沒什麼大不了,原來的主題沒有副本且在伺服器0上。我們建立叢集時,這是唯一的伺服器。

讓我們發表一些資訊給我們的新topic:

kafka-console-producer.bat --broker-list 192.168.175.104:9092 --topic my-replicated-topic

現在我們來消費這些訊息:

kafka-console-consumer.bat --bootstrap-server 192.168.175.104:9092 --from-beginning --topic my-replicated-topic

讓我們來測試一下容錯性。 Broker 1 現在是 leader,讓我們來殺了它:

> wmic process where "caption = 'java.exe' and commandline like '%server-2.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

領導權已經切換到一個從屬節點,而且節點1也不在同步副本集中了:

> bin/kafka-topics.bat --describe --zookeeper 192.168.175.104:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,0

不過,即便原先寫入訊息的leader已經不在,這些訊息仍可用於消費

五、使用Kafka Connect來匯入/匯出資料

從控制檯讀出資料並將其寫回是十分方便操作的,但你可能需要使用其他來源的資料或將資料從Kafka匯出到其他系統。針對這些系統, 你可以使用Kafka Connect來匯入或匯出資料,而不是寫自定義的整合程式碼。

Kafka Connect是Kafka的一個工具,它可以將資料匯入和匯出到Kafka。它是一種可擴充套件工具,通過執行connectors(聯結器), 使用自定義邏輯來實現與外部系統的互動。 在本文中,我們將看到如何使用簡單的connectors來執行Kafka Connect,這些connectors 將檔案中的資料匯入到Kafka topic中,並從中匯出資料到一個檔案。

首先,我們將建立一些種子資料來進行測試:

> echo foo> test.txt #新建test檔案並在檔案中寫入一行foo
> echo bar>> test.txt #test檔案並在檔案中寫入一行bar

接下來,我們將啟動兩個standalone(獨立)執行的聯結器,這意味著它們各自執行在一個單獨的本地專用 程序上。 我們提供三個配置檔案。首先是Kafka Connect的配置檔案,包含常用的配置,如Kafka brokers連線方式和資料的序列化格式。 其餘的配置檔案均指定一個要建立的聯結器。這些檔案包括聯結器的唯一名稱,類的例項,以及其他聯結器所需的配置。

bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties config\connect-file-sink.properties

這些包含在Kafka中的示例配置檔案使用您之前啟動的預設本地群集配置,並建立兩個聯結器: 第一個是源聯結器,用於從輸入檔案讀取行,並將其輸入到 Kafka topic。 第二個是接收器聯結器,它從Kafka topic中讀取訊息,並在輸出檔案中生成一行。

在啟動過程中,你會看到一些日誌訊息,包括一些聯結器正在例項化的指示。 一旦Kafka Connect程序啟動,源聯結器就開始從test.txt讀取行並且 將它們生產到主題connect-test中,同時接收器聯結器也開始從主題connect-test中讀取訊息, 並將它們寫入檔案test.sink.txt中。我們可以通過檢查輸出檔案的內容來驗證資料是否已通過整個pipeline進行交付。

資料儲存在Kafka topic` connect-test `中,因此我們也可以執行一個console consumer(控制檯消費者)來檢視 topic 中的資料(或使用custom consumer(自定義消費者)程式碼進行處理):

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning

聯結器一直在處理資料,所以我們可以將資料新增到檔案中,並看到它在pipeline 中移動。您應該可以看到這一行出現在控制檯使用者輸出和接收器檔案中。

> echo testAnother line>> test.txt #test檔案並在檔案中寫入一行testAnother line

控制檯輸出: