kafka快速開始
技術標籤:kafka
一、建立一個topic
建立一個名為“test1”的topic,它有一個分割槽和一個副本,進入windows路徑下,執行
kafka-topics.bat --create --zookeeper 192.168.175.104:2181 --replication-factor 1 --partitions 1 --topic test1
執行list(列表)命令來檢視這個topic
kafka-topics.bat --list --zookeeper 192.168.175.104:2181
使用kafka自帶的生產者和消費者客戶端測試時,不能使用localhost,須明確指定本機域名
在 server.properties,新增如下配置:
host.name = 192.168.111.130 # 本機ip地址
二、傳送一些訊息(生產者)
Kafka自帶一個命令列客戶端,它從檔案或標準輸入中獲取輸入,並將其作為message(訊息)傳送到Kafka叢集。預設情況下,每行將作為單獨的message傳送。
執行 producer,然後在控制檯輸入一些訊息以傳送到伺服器。
kafka-console-producer.bat --broker-list 192.168.175.104:9092 --topic test
三、接收訊息(消費者)
Kafka 還有一個命令列consumer(消費者),將訊息轉儲到標準輸出。
kafka-console-consumer.bat --bootstrap-server 192.168.175.104:9092 --topic test --from-beginning
將訊息輸入到生產者終端中,它們將在消費終端中顯示出來。生產者進行輸入,消費者中直接輸出。
四、設定多代理叢集
到目前為止,我們一直在使用單個代理,這並不好玩。對Kafka來說,單個代理只是一個大小為一的叢集,除了啟動更多的代理例項外,沒有什麼變化。為了深入瞭解它,讓我們把叢集擴充套件到三個節點(仍然在本地機器上)。
首先,為每個代理建立一個配置檔案:
E:\kafka_2.12-1.0.0\config>copy server.properties server-1.properties 已複製 1 個檔案。 E:\kafka_2.12-1.0.0\config>copy server.properties server-2.properties 已複製 1 個檔案。
現在編輯這些新檔案並設定如下屬性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id
屬性是叢集中每個節點的名稱,這一名稱是唯一且永久的。我們必須重寫埠和日誌目錄,因為我們在同一臺機器上執行這些,我們不希望所有的代理嘗試在同一個埠註冊,或者覆蓋彼此的資料。
我們已經建立Zookeeper和一個單節點了,現在我們只需要啟動兩個新的節點:
E:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server-1.properties
E:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server-2.properties
現在建立一個副本為3的新topic:
E:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --create --zookeeper 192.168.175.104:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Good,現在我們有一個叢集,但是我們怎麼才能知道那些代理在做什麼呢?執行"describe topics"命令來檢視:
E:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --describe --zookeeper 192.168.175.104:2181 --topic my-replicated-topic
以下是對輸出資訊的解釋。第一行給出了所有分割槽的摘要,下面的每行都給出了一個分割槽的資訊。因為我們只有一個分割槽,所以只有一行。
-
“leader”是負責給定分割槽所有讀寫操作的節點。每個節點都是隨機選擇的部分分割槽的領導者。
-
“replicas”是複製分割槽日誌的節點列表,不管這些節點是leader還是僅僅活著。
-
“isr”是一組“同步”replicas,是replicas列表的子集,它活著並被指到leader。
請注意,在示例中,節點1是該主題中唯一分割槽的領導者。
我們可以在已建立的原始主題上執行相同的命令來檢視它的位置:
E:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --describe --zookeeper 192.168.175.104:2181 --topic test
這沒什麼大不了,原來的主題沒有副本且在伺服器0上。我們建立叢集時,這是唯一的伺服器。
讓我們發表一些資訊給我們的新topic:
kafka-console-producer.bat --broker-list 192.168.175.104:9092 --topic my-replicated-topic
現在我們來消費這些訊息:
kafka-console-consumer.bat --bootstrap-server 192.168.175.104:9092 --from-beginning --topic my-replicated-topic
讓我們來測試一下容錯性。 Broker 1 現在是 leader,讓我們來殺了它:
> wmic process where "caption = 'java.exe' and commandline like '%server-2.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f
領導權已經切換到一個從屬節點,而且節點1也不在同步副本集中了:
> bin/kafka-topics.bat --describe --zookeeper 192.168.175.104:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0
不過,即便原先寫入訊息的leader已經不在,這些訊息仍可用於消費
五、使用Kafka Connect來匯入/匯出資料
從控制檯讀出資料並將其寫回是十分方便操作的,但你可能需要使用其他來源的資料或將資料從Kafka匯出到其他系統。針對這些系統, 你可以使用Kafka Connect來匯入或匯出資料,而不是寫自定義的整合程式碼。
Kafka Connect是Kafka的一個工具,它可以將資料匯入和匯出到Kafka。它是一種可擴充套件工具,通過執行connectors(聯結器), 使用自定義邏輯來實現與外部系統的互動。 在本文中,我們將看到如何使用簡單的connectors來執行Kafka Connect,這些connectors 將檔案中的資料匯入到Kafka topic中,並從中匯出資料到一個檔案。
首先,我們將建立一些種子資料來進行測試:
> echo foo> test.txt #新建test檔案並在檔案中寫入一行foo
> echo bar>> test.txt #test檔案並在檔案中寫入一行bar
接下來,我們將啟動兩個standalone(獨立)執行的聯結器,這意味著它們各自執行在一個單獨的本地專用 程序上。 我們提供三個配置檔案。首先是Kafka Connect的配置檔案,包含常用的配置,如Kafka brokers連線方式和資料的序列化格式。 其餘的配置檔案均指定一個要建立的聯結器。這些檔案包括聯結器的唯一名稱,類的例項,以及其他聯結器所需的配置。
bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties config\connect-file-sink.properties
這些包含在Kafka中的示例配置檔案使用您之前啟動的預設本地群集配置,並建立兩個聯結器: 第一個是源聯結器,用於從輸入檔案讀取行,並將其輸入到 Kafka topic。 第二個是接收器聯結器,它從Kafka topic中讀取訊息,並在輸出檔案中生成一行。
在啟動過程中,你會看到一些日誌訊息,包括一些聯結器正在例項化的指示。 一旦Kafka Connect程序啟動,源聯結器就開始從test.txt
讀取行並且 將它們生產到主題connect-test
中,同時接收器聯結器也開始從主題connect-test
中讀取訊息, 並將它們寫入檔案test.sink.txt
中。我們可以通過檢查輸出檔案的內容來驗證資料是否已通過整個pipeline進行交付。
資料儲存在Kafka topic` connect-test `中,因此我們也可以執行一個console consumer(控制檯消費者)來檢視 topic 中的資料(或使用custom consumer(自定義消費者)程式碼進行處理):
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning
聯結器一直在處理資料,所以我們可以將資料新增到檔案中,並看到它在pipeline 中移動。您應該可以看到這一行出現在控制檯使用者輸出和接收器檔案中。
> echo testAnother line>> test.txt #test檔案並在檔案中寫入一行testAnother line
控制檯輸出: