kafka的安裝和除錯
阿新 • • 發佈:2019-02-02
1. 簡介
kafka (官網地址:http://kafka.apache.org)是一款分散式訊息釋出和訂閱的系統,具有高效能和高吞吐率。
i. 訊息的釋出(publish)稱作producer,訊息的訂閱(subscribe)稱作consumer,中間的儲存陣列稱作broker。
ii. 多個broker協同合作,producer、consumer和broker三者之間通過zookeeper來協調請求和轉發。
iii. producer產生和推送(push)資料到broker,consumer從broker拉取(pull)資料並進行處理。
iv. broker端不維護資料的消費狀態,提升了效能。
v. 直接使用磁碟進行儲存,線性讀寫,速度快:避免了資料在JVM記憶體和系統記憶體之間的複製,減少耗效能的建立物件和垃圾回收。
vi. Kafka使用scala編寫,可以執行在JVM上。
2. 安裝:
a. 首先安裝JRE/JDK
Linux安裝JDK
b. 下載kafka
進入下載頁面:http://kafka.apache.org/downloads.html
選擇Binary downloads下載 (Source download需要編譯才能使用)
也可以直接在linux終端下載:
Shell程式碼
c. 解壓
Shell程式碼
目錄:
/bin 啟動和停止命令等。
/config 配置檔案
/libs 類庫
d. 修改配置
Kafka預設開啟JVM壓縮指標,但只是在64位的HotSpot VM受支援,如果安裝了32位的HotSpot VM,需要修改/bin/kafka-run-class.sh檔案
Shell程式碼
找到如下行:
Sh程式碼
去除-XX:+UseCompressedOops引數
3. 啟動和停止
啟動Zookeeper server:
Shell程式碼
&是為了能退出命令列
啟動Kafka server:
Shell程式碼
停止Kafka server
Shell程式碼
停止Zookeeper server:
Shell程式碼
4. 單機連通性測試
執行producer:
Shell程式碼
早版本的Kafka,--broker-list localhost:9092需改為--zookeeper localhost:2181
執行consumer:
Shell程式碼
在producer端輸入字串並回車,檢視consumer端是否顯示。
5. 分散式連通性測試
Zookeeper Server, Kafka Server, Producer都放在伺服器server1上,ip地址為192.168.1.10
Consumer放在伺服器server2上,ip地址為192.168.1.12。
分別執行server1的producer和server2的consumer,
Shell程式碼
Shell程式碼
在producer的console端輸入字串,consumer報Connection refused錯誤:
broker, producer和consumer都註冊到zookeeper上,producer和consumer的引數明確指定。問題出在broker的配置檔案server.properties上:
Properties程式碼
host名稱沒有指定,就是127.0.0.1,consumer去broker拿資料就有問題。設定為192.168.1.10,重啟服務就好了。
kafka (官網地址:http://kafka.apache.org)是一款分散式訊息釋出和訂閱的系統,具有高效能和高吞吐率。
i. 訊息的釋出(publish)稱作producer,訊息的訂閱(subscribe)稱作consumer,中間的儲存陣列稱作broker。
ii. 多個broker協同合作,producer、consumer和broker三者之間通過zookeeper來協調請求和轉發。
iii. producer產生和推送(push)資料到broker,consumer從broker拉取(pull)資料並進行處理。
iv. broker端不維護資料的消費狀態,提升了效能。
v. 直接使用磁碟進行儲存,線性讀寫,速度快:避免了資料在JVM記憶體和系統記憶體之間的複製,減少耗效能的建立物件和垃圾回收。
vi. Kafka使用scala編寫,可以執行在JVM上。
2. 安裝:
a. 首先安裝JRE/JDK
Linux安裝JDK
b. 下載kafka
進入下載頁面:http://kafka.apache.org/downloads.html
選擇Binary downloads下載 (Source download需要編譯才能使用)
也可以直接在linux終端下載:
Shell程式碼
- wget -q http://apache.fayea.com/apache-mirror/kafka/0.8.1/kafka_2.8.0-0.8.1.tgz
c. 解壓
Shell程式碼
- tar -xzvf kafka_2.8.0-0.8.1.tgz
- rm kafka_2.8.0-0.8.1.tgz
- cd kafka_2.8.0-0.8.1
目錄:
/bin 啟動和停止命令等。
/config 配置檔案
/libs 類庫
d. 修改配置
Kafka預設開啟JVM壓縮指標,但只是在64位的HotSpot VM受支援,如果安裝了32位的HotSpot VM,需要修改/bin/kafka-run-class.sh檔案
Shell程式碼
- vi bin/kafka-run-class.sh
找到如下行:
Sh程式碼
- KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
去除-XX:+UseCompressedOops引數
3. 啟動和停止
啟動Zookeeper server:
Shell程式碼
- bin/zookeeper-server-start.sh config/zookeeper.properties &
&是為了能退出命令列
啟動Kafka server:
Shell程式碼
- bin/kafka-server-start.sh config/server.properties &
停止Kafka server
Shell程式碼
- bin/kafka-server-stop.sh
停止Zookeeper server:
Shell程式碼
- bin/zookeeper-server-stop.sh
4. 單機連通性測試
執行producer:
Shell程式碼
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
早版本的Kafka,--broker-list localhost:9092需改為--zookeeper localhost:2181
執行consumer:
Shell程式碼
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在producer端輸入字串並回車,檢視consumer端是否顯示。
5. 分散式連通性測試
Zookeeper Server, Kafka Server, Producer都放在伺服器server1上,ip地址為192.168.1.10
Consumer放在伺服器server2上,ip地址為192.168.1.12。
分別執行server1的producer和server2的consumer,
Shell程式碼
- bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test
Shell程式碼
- bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test --from-beginning
在producer的console端輸入字串,consumer報Connection refused錯誤:
broker, producer和consumer都註冊到zookeeper上,producer和consumer的引數明確指定。問題出在broker的配置檔案server.properties上:
Properties程式碼
- # Hostname the broker will bind to. If not set, the server will bind to all interfaces
- #host.name=localhost
host名稱沒有指定,就是127.0.0.1,consumer去broker拿資料就有問題。設定為192.168.1.10,重啟服務就好了。