kafka_0.11.0.0叢集部署
1. 簡介
kafka (官網地址:http://kafka.apache.org)是一款分散式訊息釋出和訂閱的系統,具有高效能和高吞吐率。
i. 訊息的釋出(publish)稱作producer,訊息的訂閱(subscribe)稱作consumer,中間的儲存陣列稱作broker。
ii. 多個broker協同合作,producer、consumer和broker三者之間通過zookeeper來協調請求和轉發。
iii. producer產生和推送(push)資料到broker,consumer從broker拉取(pull)資料並進行處理。
iv. broker端不維護資料的消費狀態,提升了效能。
v. 直接使用磁碟進行儲存,線性讀寫,速度快:避免了資料在JVM記憶體和系統記憶體之間的複製,減少耗效能的建立物件和垃圾回收。
vi. Kafka使用scala編寫,可以執行在JVM上。
2. 安裝:
建立使用者:
[[email protected] ~]##建立使用者,並指定所屬用組
[[email protected] ~]#useradd -d /home/kafka -g escgrp -m kafka
[[email protected] ~]##修改使用者密碼
[[email protected] ~]#passwd kafka
passwd: all authentication tokens updated successfully.
安裝JRE/JDK
$pwd
/home/kafka
$unzip jdk1.8.0_112.zip
$chmod -R 755 jdk1.8.0_112
檢查修改各使用者配置檔案引數
$vi .bash_profile
JAVA_HOME=/home/kafka/jdk1.8.0_11
export JAVA_HOME
PATH=$JAVA_HOME/bin:$HOME/bin:$PATH
export PATH
export LANG=zh_CN.utf8
儲存退出,使用之生效
$. .bash_profile
檢查jdk變數是否設定成功
$java -version
在安裝kafka之前先把zookeeper叢集開啟,安裝部署zookeeper網上找教程
3.下載kafka
進入下載頁面:
解壓 :
tar -xzvf kafka_2.11-0.11.tgz
http://kafka.apache.org/downloads.html
修改配置
$kafka/kafka_2.11-0.11.0.0/config/server.properties
#broker標識,id為正數,且全域性不能重複(有幾臺kafkaserver配置幾個broker)
broker.id=0
# broker監聽請求的socket地址
listeners=PLAINTEXT://:9092
port=9092
host.name=192.168.1.181
#日誌檔案儲存目錄(此目錄可以自定義)
log.dirs=/home/kafka/kafka-2.11_myself_logs
#zookeeper管理的地址ip1:port1,ip2:port2……
zookeeper.connect=localhost:2181
4、啟動Kafka
bin/kafka-server-start.sh -daemon config/server.properties &
檢視kafka程序:
ps -ef|grep java
或者利用jps:
關閉kafka
- kafka-server-stop.sh總是無法成功所以利用以下辦法:
kill -s TERM $(jps -l | grep 'kafka\.Kafka' | awk '{print $1}')
測試:
啟動2個XSHELL客戶端,一個用於生產者傳送訊息,一個用於消費者接受訊息。
執行producer,隨機敲入幾個字元,相當於把這個敲入的字元訊息傳送給佇列。
bin/kafka-console-producer.sh --broker-list 192.168.75.131:9092 --topic test
說明:早版本的Kafka,–broker-list 192.168.75.131:9092需改為–zookeeper 192.168.75.131:2181
執行consumer,可以看到剛才傳送的訊息列表。
bin/kafka-console-consumer.sh --zookeeper 192.168.75.131:2181 --topic test --from-beginning
注意:
producer,指定的Socket(192.168.75.131+9092),說明生產者的訊息要發往kafka,也即是broker
consumer, 指定的Socket(192.168.75.131+2181),說明消費者的訊息來自zookeeper(協調轉發)
上面的IP地址要看你具體的地址
搭建一個多個broker的偽叢集
- 仿照上面的配置檔案Server.properties,建立另個Server1.properties
#broker標識,id為正數,且全域性不能重複(有幾臺kafkaserver配置幾個broker)
broker.id=1
# broker監聽請求的socket地址
listeners=PLAINTEXT://:9093
port=9093
host.name=192.168.1.181
#日誌檔案儲存目錄(此目錄可以自定義)
log.dirs=/home/kafka/kafka-2.11_myself_logs1
#zookeeper管理的地址ip1:port1,ip2:port2……
zookeeper.connect=localhost:2181
再建立Server2.properties 上面配置類推
2. 啟動所有的broker
命令如下:
bin/kafka-server-start.sh -daemon config/server.properties & #啟動broker
bin/kafka-server-start.sh -daemon config/server1.properties & #啟動broker1
bin/kafka-server-start.sh -daemon config/server2.properties & #啟動broker2
(3)建立topic
bin/kafka-topics.sh --create --topic agent2flow --partitions 1 --replication-factor 3 \--zookeeper localhost:2181
可以再次輸入檢視一共有哪些主題
./kafka-topics.sh --list --zookeeper 192.168.75.131:2181
再建立生產者和消費者嘗試傳送資訊即可測試是否成功