1. 程式人生 > >kafka_0.10.1.0叢集部署

kafka_0.10.1.0叢集部署

1. 背景簡介

Kafka是一個分散式流平臺,原本開發自LinkedIn,之後成為Apache專案的一部分,用於構建實時資料管道和流媒體應用,水平擴充套件、容錯,很多公司都在生產環境中使用:LinkedIn,Yahoo,Twitter,Uber,Oracle…..。
Kafka背景及架構介紹:
http://www.infoq.com/cn/articles/kafka-analysis-part-1/
http://sanwen8.cn/p/2036To3.html

2. 分散式部署

以3個節點為例,分別是master,slave1,slave2

2.1 配置java環境

Kafka是由scala語言編寫的,所以它運行於jvm上面,首先各個節點要安裝jdk以及配置環境變數:

export JAVA_HOME=/usr/java/default/
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

從安全層面考慮,推薦使用JDK1.8,因為較舊的免費版本已經公開了安全漏洞。LinkedIn目前正在使用的是JDK1.8 u5(G1垃圾收集器),如果決定在JDK1.7上使用G1垃圾收集器,確定版本為u51或者更新(kafka預設是使用G1垃圾收集器的)。

2.2 安裝zookeeper叢集

Kafka內建有zookeeper服務,最好把zookeeper獨立出來,其它元件服務(hbase等)也可以共享。Kafka使用ZkClient與ZooKeepe互動,0.10.1.0版本對應ZkClient-0.9,而ZkClient-0.9則對應zookeeper-3.4.8。具體的叢集部署不再詳述,參考網上的文章。

2.3 下載kafka安裝包

選擇0.10.1.0版本安裝包,地址如下:http://kafka.apache.org/downloads.html
上傳安裝包到任意一個節點,解壓:tar -zxvf kafka_2.11-0.10.1.0.tgz
PS:2.11表示的是scala版本,安裝包已經內建scala庫,所以無需獨立安裝scala.

2.4 修改配置檔案

如果是想單純地快速啟動一個broker進行嘗試kafka的功能,則可以使用預設的配置,在分散式環境中建議至少修改以下4個配置專案,其它的配置再根據實際專案環境進行調優。以master節點配置為示例,其餘節點根據引數說明作相應調整:

broker.id=0
listeners=PLAINTEXT://master:9092   
log.dirs=/tmp/kafka-logs
zookeeper.connect=master:2181,slave1:2181,slave2:2181

配置說明:
broker.id:broker的id,必需是一個全域性(叢集)唯一的整數值,即叢集中每個kafka server的配置不能相同。
listeners:socket監聽的地址,格式為listeners = security_protocol://host_name:port.如果沒有配置該引數,則預設通過java的api(java.net.InetAddress.getCanonicalHostName())來獲取主機名,埠預設為9092,建議進行顯式配置,避免多網絡卡時解析有誤。
log.dirs:日誌儲存目錄,其實儲存著各個主題的訊息資料。多個目錄可以以逗號隔開,預設值為/tmp/kafka-logs,可以每個日誌目錄掛在不同的磁碟做來負載均衡。
zookeeper.connect:zookeeper連線地址,根據實際配置。

2.5 啟動叢集

啟動kafka服務之前先確定zookeeper叢集已啟動,通過以下命令以守護程序的方式啟動每個kafka節點:

bin/kafka-server-start.sh -daemon config/server.properties

啟動成功後可以看到:

這裡寫圖片描述

PS:這裡有一個地方要注意,停止指令碼(kafka-server-stop.sh)好像不能把服務停止,實際上這個指令碼也只是把”Kafka”程序kill掉,而指令碼不能正常獲取到程序pid,可以使用以下兩種方式來結束程序:
1. 先通過jps來獲取pid,然後: kill -s TERM $PIDS (不要直接kill -9)
2. 修改指令碼,使得可以正確解析出pid:

PIDS=$(ps ax | grep -i 'Kafka\.kafka' | grep java | grep -v grep | awk '{print $1}')

改成:

PIDS=$(ps ax | grep -i 'kafka' | grep java | grep -v grep | awk '{print $1}')

2.6 測試叢集

1.建立topic

bin/kafka-topics.sh --zookeeper master:2181 --create --replication-factor 1 --partitions 1 --topic test

建立了一個test主題,分割槽數為1,沒有備份數

2.檢視topic
這裡寫圖片描述

__consumer_offsets:這個是kafka內部的topic,不允許刪除

3.建立生產者producer
Kafka自帶命令列的客戶端,可以將檔案或者標準輸入的訊息傳送到kafka叢集中,預設一行作為一個訊息:
這裡寫圖片描述
可以看到,已經從生產者中傳送了兩個訊息(“hello”,”my kafka”)到test主題,接著建立一個消費者來消費這些訊息。

4.建立消費者consumer
這裡寫圖片描述
消費者已經接收到了剛才生產者傳送到test主題的訊息。由此,通過簡單的測試驗證了叢集能夠正常執行。

PS:當使用delete命令刪除主題是,預設只是進行標記,並沒有真正的刪除
這裡寫圖片描述
需要在config/server.properties配置檔案中開啟delete.topic.enable=true

3. API開發

Kafka通過獨立的協議來暴露它的所有功能,所有客戶端可以使用各種語言進行程式設計,但只有java作為kafka專案的一部分在維護,至於其它的語言則可以通過一些開源的專案。下面通過java api模擬生產者消費者,同時向叢集生產訊息以及消費訊息。

3.1 引入依賴包

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
</dependency>

3.2 編寫生產者程式

編寫一個生產者,定時向kafka叢集中的test主題傳送一些測試的訊息:

Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++){
    System.out.println("producer sends message: " + i);
    producer.send(new ProducerRecord<String, String>("test", i + "", i + ""));
}
producer.close();

3.3 編寫消費者程式

編寫消費者程式,從kafka叢集中的test主題消費訊息:

Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

3.4 測試執行

分別啟動生產者與消費者程式,並觀察執行結果,生產者不斷髮送訊息:
這裡寫圖片描述

消費者程式接收訊息:
這裡寫圖片描述

同時通過指令碼啟動的消費者也接收到訊息:
這裡寫圖片描述

4. 進階內容