kafka急速入門與核心API解析
阿新 • • 發佈:2020-08-03
kafka環境安裝
上一節課我們已經對kafka的基本概念、核心思想有了一定的瞭解和認知,並且掌握了kafka在實際工作中的一些主要的應用場景。那麼接下來,我們就一起快速進入kafka的安裝吧。
-
kafka下載地址:http://kafka.apache.org/downloads.html
-
kafka安裝環境介紹:
節點名稱 節點作用 節點備註 hostname:192.168.11.111 zookeeper節點 kafka註冊、配置中心 hostname:192.168.11.112 zookeeper節點 kafka註冊、配置中心 hostname:192.168.11.113 zookeeper節點 kafka註冊、配置中心 hostname:192.168.11.51 kafka節點 此節點為kafka broker -
kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小夥伴們先準備好zookeeper環境(三個節點即可),然後我們來一起構建kafka broker。
## 解壓命令: tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/ ## 改名命令: mv kafka_2.12-2.1.0/ kafka_2.12 ## 進入解壓後的目錄,修改server.properties檔案: vim /usr/local/kafka_2.12/config/server.properties ## 修改配置: broker.id=0 port=9092 host.name=192.168.11.51 advertised.host.name=192.168.11.51 log.dirs=/usr/local/kafka_2.12/kafka-logs num.partitions=2 zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181 ## 建立日誌資料夾: mkdir /usr/local/kafka_2.12/kafka-logs ##啟動kafka: /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
kafka常用命令
我們接下來一起了解幾個非常重要的命令,通過這些命令我們對kafka topic partition 進行檢視和操作。
-
常用命令:
## 簡單操作: #(1)建立topic主題命令:(建立名為test的topic, 1個分割槽分別存放資料,資料備份總共1份) kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic topic1 --partitions 1 --replication-factor 1 ## --zookeeper 為zookeeper服務列表地址配置項,這裡任意指定zookeeper其中一個服務列表地址即可 ## --create 命令後 --topic 為建立topic 並指定 topic name ## --partitions 為指定分割槽數量配置項 ## --replication-factor 為指定副本集數量配置項 #(2)檢視topic列表命令: kafka-topics.sh --zookeeper 192.168.11.111:2181 --list #(3)kafka命令傳送資料:(然後我們就可以編寫資料傳送出去了) kafka-console-producer.sh --broker-list 192.168.11.51:9092 --topic topic1 ## --brokerlist kafka服務的broker節點列表 #(4)kafka命令接受資料:(然後我們就可以看到消費的資訊了) kafka-console-consumer.sh --bootstrap-server 192.168.11.51:9092 --topic topic1 --from-beginning #(5)刪除topic命令: kafka-topics.sh --zookeeper 192.168.11.111:2181 --delete --topic topic1 #(6)kafka檢視消費進度:(當我們需要檢視一個消費者組的消費進度時,則使用下面的命令) kafka-consumer-groups.sh --bootstrap-server 192.168.11.51:9092 --describe --group group1 ## --describe --group 為訂閱組, 後面指定 group name
急速入門
下面我們一起使用kafka最基本的API來對kafka進行操作!
-
kafka依賴包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> </dependency>
-
kafka生產者:
package com.bfxy.mix.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import com.alibaba.fastjson.JSON; public class CollectKafkaProducer { // 建立一個kafka生產者 private final KafkaProducer<String, String> producer; // 定義一個成員變數為topic private final String topic; // 初始化kafka的配置檔案和例項:Properties & KafkaProducer public CollectKafkaProducer(String topic) { Properties props = new Properties(); // 配置broker地址 props.put("bootstrap.servers", "192.168.11.51:9092"); // 定義一個 client.id props.put("client.id", "demo-producer-test"); // 其他配置項: // props.put("batch.size", 16384); //16KB -> 滿足16KB傳送批量訊息 // props.put("linger.ms", 10); //10ms -> 滿足10ms時間間隔傳送批量訊息 // props.put("buffer.memory", 33554432); //32M -> 快取提效能 // kafka 序列化配置: props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 建立 KafkaProducer 與 接收 topic this.producer = new KafkaProducer<>(props); this.topic = topic; } // 傳送訊息 (同步或者非同步) public void send(Object message, boolean syncSend) throws InterruptedException { try { // 同步傳送 if(syncSend) { producer.send(new ProducerRecord<>(topic, JSON.toJSONString(message))); } // 非同步傳送(callback實現回撥監聽) else { producer.send(new ProducerRecord<>(topic, JSON.toJSONString(message)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.err.println("Unable to write to Kafka in CollectKafkaProducer [" + topic + "] exception: " + e); } } }); } } catch (Exception e) { e.printStackTrace(); } } // 關閉producer public void close() { producer.close(); } // 測試函式 public static void main