kafka入門、安裝詳解
kafka是什麼?
Kafka是一個快速、可擴充套件的、高吞吐、可容錯的分散式釋出訂閱訊息系統。Kafka具有高吞吐量、內建分割槽、支援資料副本和容錯的特性,它可以處理消費者規模的網站中的所有動作流資料,具有高效能、持久化、多副本備份、橫向擴充套件能力,適合在大規模訊息處理場景中使用。
l 分散式系統,易於向外擴充套件;
l 同時為釋出和訂閱提供高吞吐量;
l 支援多訂閱者,當失敗時能自動平衡消費者;
l 將訊息持久化到磁碟,可用於批量消費;
一、安裝kafka
kafka的安裝需要依賴於jdk和zookeeper。(kafka 2.11-1.1.0版本才與JDK1.7相容,更高版本需要JDK1.8
1、yum 安裝
yum過於簡單,此處不再贅述!
2、二進位制安裝
Kafka官方下載:http://kafka.apache.org/downloads.html
Zookeeper官方下載:https://zookeeper.apache.org/releases.html
Jdk官方下載:https://www.oracle.com/java/technologies/downloads/
需要說明的是,kafka的安裝依賴於Zookeeper,所以執行kafka需要先啟動Zookeeper,zk的部署可參考《Zookeeper介紹與基本部署》。當然,kafka預設也內建了zk的啟動指令碼,在kafka
# 下載並解壓kafka部署包至/usr/local/目錄 tar -zxvf kafka_2.13-3.1.0.tgz -C /usr/local/ # 修改配置檔案 vim config/zookeeper.properties vim config/server.properties # 重點配置引數說明: broker.id=0 #唯一標識,叢集裡每個broker的id需不同 listeners=PLAINTEXT://192.168.121.132:9092 #暴露服務,否則連線超時log.dirs=/data/kafka #日誌路徑,路徑需提前建立好,且必須有讀寫許可權 zookeeper.connect=localhost:2181 #設定zk的連線地址及埠
啟動zk:./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
啟動kafka:./bin/kafka-server-start.sh -daemon config/server.properties
3、測試驗證
注意:kafka高版本已不再使用”--zookeeper”引數來建立topic,而是採用” --bootstrap-server”來代替。
#建立topic ./bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic panwei #舊版本建立方式 ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic panwei #查詢topic詳情 ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic panwei #查詢所有topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list #修改topic引數配置 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic panwei --parti-tions count #刪除topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic panwei #建立生產者 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic panwei #建立消費者(有非必須引數,分割槽與consumer之間的關係:一個分割槽不能分給兩個consumer,但是兩個分割槽可以分給一個consumer) ./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic panwei --from-beginning --group testgroup
二、kafka配置檔案
Kafka配置檔案目錄:../config/server.properties
#是否允許刪除topic,預設false不能手動刪除 delete.topic.enable=true #當前機器在叢集中的唯一標識,和zookeeper的myid性質一樣 broker.id=0 #當前kafka服務偵聽的地址和埠,埠預設是9092 listeners = PLAINTEXT://192.168.100.21:9092 #這個是borker進行網路處理的執行緒數 num.network.threads=3 #這個是borker進行I/O處理的執行緒數 num.io.threads=8 #傳送緩衝區buffer大小,資料不是一下子就傳送的,先會儲存到緩衝區到達一定的大小後在傳送,能提高效能 socket.send.buffer.bytes=102400 #kafka接收緩衝區大小,當資料到達一定大小後在序列化到磁碟 socket.receive.buffer.bytes=102400 #這個引數是向kafka請求訊息或者向kafka傳送訊息的請請求的最大數,這個值不能超過java的堆疊大小 socket.request.max.bytes=104857600 #訊息日誌存放的路徑 log.dirs=/opt/module/kafka_2.11-1.1.0/logs #預設的分割槽數,一個topic預設1個分割槽數 num.partitions=1 #每個資料目錄用來日誌恢復的執行緒數目 num.recovery.threads.per.data.dir=1 #預設訊息的最大持久化時間,168小時,7天 log.retention.hours=168 #這個引數是:因為kafka的訊息是以追加的形式落地到檔案,當超過這個值的時候,kafka會新起一個檔案 log.segment.bytes=1073741824 #每隔300000毫秒去檢查上面配置的log失效時間 log.retention.check.interval.ms=300000 #是否啟用log壓縮,一般不用啟用,啟用的話可以提高效能 log.cleaner.enable=false #設定zookeeper的連線埠,叢集多個地址用,分隔 zookeeper.connect=node21:2181,node22:2181,node23:2181 #設定zookeeper的連線超時時間 zookeeper.connection.timeout.ms=6000
---