1. 程式人生 > >kafka的安裝和除錯

kafka的安裝和除錯

1. 簡介

kafka (官網地址:http://kafka.apache.org)是一款分散式訊息釋出和訂閱的系統,具有高效能和高吞吐率。



i. 訊息的釋出(publish)稱作producer,訊息的訂閱(subscribe)稱作consumer,中間的儲存陣列稱作broker。

ii. 多個broker協同合作,producer、consumer和broker三者之間通過zookeeper來協調請求和轉發。

iii. producer產生和推送(push)資料到broker,consumer從broker拉取(pull)資料並進行處理。

iv. broker端不維護資料的消費狀態,提升了效能。

v. 直接使用磁碟進行儲存,線性讀寫,速度快:避免了資料在JVM記憶體和系統記憶體之間的複製,減少耗效能的建立物件和垃圾回收。

vi. Kafka使用scala編寫,可以執行在JVM上。

2. 安裝:


a. 首先安裝JRE/JDK

Linux安裝JDK

b. 下載kafka

進入下載頁面:http://kafka.apache.org/downloads.html

選擇Binary downloads下載 (Source download需要編譯才能使用)

也可以直接在linux終端下載:
Shell程式碼  收藏程式碼
  1. wget -q http://apache.fayea.com/apache-mirror/kafka/0.8.1/kafka_2.8.0-0.8.1.tgz  


c. 解壓
Shell程式碼  收藏程式碼
  1. tar -xzvf kafka_2.8.0-0.8.1.tgz  
  2. rm kafka_2.8.0-0.8.1.tgz  
  3. cd kafka_2.8.0-0.8.1


目錄:


/bin 啟動和停止命令等。
/config 配置檔案
/libs 類庫

d. 修改配置
Kafka預設開啟JVM壓縮指標,但只是在64位的HotSpot VM受支援,如果安裝了32位的HotSpot VM,需要修改/bin/kafka-run-class.sh檔案
Shell程式碼  收藏程式碼
  1. vi bin/kafka-run-class.sh  


找到如下行:
Sh程式碼  收藏程式碼
  1. KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"

去除-XX:+UseCompressedOops引數

3. 啟動和停止

啟動Zookeeper server:
Shell程式碼  收藏程式碼
  1. bin/zookeeper-server-start.sh config/zookeeper.properties &  

&是為了能退出命令列

啟動Kafka server:
Shell程式碼  收藏程式碼
  1. bin/kafka-server-start.sh config/server.properties &  


停止Kafka server
Shell程式碼  收藏程式碼
  1. bin/kafka-server-stop.sh  


停止Zookeeper server:
Shell程式碼  收藏程式碼
  1. bin/zookeeper-server-stop.sh  


4. 單機連通性測試

執行producer:
Shell程式碼  收藏程式碼
  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  

早版本的Kafka,--broker-list localhost:9092需改為--zookeeper localhost:2181

執行consumer:
Shell程式碼  收藏程式碼
  1. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  

在producer端輸入字串並回車,檢視consumer端是否顯示。

5. 分散式連通性測試

Zookeeper Server, Kafka Server, Producer都放在伺服器server1上,ip地址為192.168.1.10
Consumer放在伺服器server2上,ip地址為192.168.1.12。

分別執行server1的producer和server2的consumer,
Shell程式碼  收藏程式碼
  1. bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test  


Shell程式碼  收藏程式碼
  1. bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test --from-beginning  


在producer的console端輸入字串,consumer報Connection refused錯誤:


broker, producer和consumer都註冊到zookeeper上,producer和consumer的引數明確指定。問題出在broker的配置檔案server.properties上:
Properties程式碼  收藏程式碼
  1. # Hostname the broker will bind to. If not set, the server will bind to all interfaces  
  2. #host.name=localhost  

host名稱沒有指定,就是127.0.0.1,consumer去broker拿資料就有問題。設定為192.168.1.10,重啟服務就好了。