1. 程式人生 > 其它 >kafka入門、安裝詳解

kafka入門、安裝詳解

kafka是什麼?

Kafka是一個快速、可擴充套件的、高吞吐、可容錯的分散式釋出訂閱訊息系統。Kafka具有高吞吐量、內建分割槽、支援資料副本和容錯的特性,它可以處理消費者規模的網站中的所有動作流資料,具有高效能、持久化、多副本備份、橫向擴充套件能力適合在大規模訊息處理場景中使用。

分散式系統,易於向外擴充套件;

同時為釋出和訂閱提供高吞吐量;

支援多訂閱者,當失敗時能自動平衡消費者;

將訊息持久化到磁碟,可用於批量消費;

一、安裝kafka

 kafka的安裝需要依賴於jdkzookeeperkafka 2.11-1.1.0版本才與JDK1.7相容,更高版本需要JDK1.8

1yum 安裝

 yum過於簡單,此處不再贅述!

2、二進位制安裝

Kafka官方下載:http://kafka.apache.org/downloads.html

Zookeeper官方下載:https://zookeeper.apache.org/releases.html

Jdk官方下載:https://www.oracle.com/java/technologies/downloads/

需要說明的是,kafka的安裝依賴於Zookeeper所以執行kafka需要先啟動Zookeeper,zk的部署可參考《Zookeeper介紹與基本部署》。當然,kafka預設也內建了zk的啟動指令碼,在kafka

安裝路徑的bin目錄下,名稱為zookeeper-server-start.sh,如果不想獨立安裝zk,可直接使用該指令碼。

# 下載並解壓kafka部署包至/usr/local/目錄
tar -zxvf kafka_2.13-3.1.0.tgz -C /usr/local/
# 修改配置檔案
vim config/zookeeper.properties
vim config/server.properties

# 重點配置引數說明:
broker.id=0        #唯一標識,叢集裡每個broker的id需不同
listeners=PLAINTEXT://192.168.121.132:9092       #暴露服務,否則連線超時
log.dirs=/data/kafka #日誌路徑,路徑需提前建立好,且必須有讀寫許可權 zookeeper.connect=localhost:2181 #設定zk的連線地址及埠

啟動zk:./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

啟動kafka:./bin/kafka-server-start.sh -daemon config/server.properties

3、測試驗證

注意:kafka高版本已不再使用”--zookeeper”引數來建立topic,而是採用” --bootstrap-server”來代替。

#建立topic

./bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic panwei  #舊版本建立方式

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic panwei

#查詢topic詳情

./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic panwei

#查詢所有topic

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

#修改topic引數配置

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic panwei --parti-tions count

#刪除topic

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic panwei


#建立生產者

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic panwei

#建立消費者(有非必須引數,分割槽與consumer之間的關係:一個分割槽不能分給兩個consumer,但是兩個分割槽可以分給一個consumer)

./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic panwei --from-beginning --group testgroup

二、kafka配置檔案

Kafka配置檔案目錄:../config/server.properties

#是否允許刪除topic,預設false不能手動刪除

delete.topic.enable=true

#當前機器在叢集中的唯一標識,和zookeeper的myid性質一樣

broker.id=0

#當前kafka服務偵聽的地址和埠,埠預設是9092

listeners = PLAINTEXT://192.168.100.21:9092

#這個是borker進行網路處理的執行緒數

num.network.threads=3

#這個是borker進行I/O處理的執行緒數

num.io.threads=8

#傳送緩衝區buffer大小,資料不是一下子就傳送的,先會儲存到緩衝區到達一定的大小後在傳送,能提高效能

socket.send.buffer.bytes=102400

#kafka接收緩衝區大小,當資料到達一定大小後在序列化到磁碟

socket.receive.buffer.bytes=102400

#這個引數是向kafka請求訊息或者向kafka傳送訊息的請請求的最大數,這個值不能超過java的堆疊大小

socket.request.max.bytes=104857600

#訊息日誌存放的路徑

log.dirs=/opt/module/kafka_2.11-1.1.0/logs

#預設的分割槽數,一個topic預設1個分割槽數

num.partitions=1

#每個資料目錄用來日誌恢復的執行緒數目

num.recovery.threads.per.data.dir=1

#預設訊息的最大持久化時間,168小時,7天

log.retention.hours=168

#這個引數是:因為kafka的訊息是以追加的形式落地到檔案,當超過這個值的時候,kafka會新起一個檔案

log.segment.bytes=1073741824

#每隔300000毫秒去檢查上面配置的log失效時間

log.retention.check.interval.ms=300000

#是否啟用log壓縮,一般不用啟用,啟用的話可以提高效能

log.cleaner.enable=false

#設定zookeeper的連線埠,叢集多個地址用,分隔

zookeeper.connect=node21:2181,node22:2181,node23:2181

#設定zookeeper的連線超時時間

zookeeper.connection.timeout.ms=6000

---