Kafka的叢集安裝
修改server.properties
20 行 id 設定唯一識別 3臺機器 所以第一臺配置1 第二臺2第三臺3
50行 設定kafka的資料路徑
119行 加入zk的地址
檔案最後 如果有則修改屬性值,沒有就加上
#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟
log.flush.interval.messages=10000
#訊息buffer的時間,達到閾值,將觸發flush到磁碟
log.flush.interval.ms=3000
#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true
#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=192.168.244.161
至此 配置完成,將本機的kafka檔案拷貝到其他機器
scp -r kafka ip:$PWD
下面介紹一些命令
開啟kafka
./bin/kafka-server-start.sh -daemon config/server.properties
關閉kafka
./bin/kafka-server-stop.sh config/server.properties
建立topic
./bin/kafka-topics.sh --create --zookeeper 192.168.244.160:2181,192.168.244.161:2181,192.168.244.162:2181 --partition 3 --replication-factor 3 --topic test01
檢視
./bin/kafka-topics.sh --list --zookeeper 192.168.244.160:2181,192.168.244.161:2181,192.168.244.162:2181
刪除
./bin/kafka-topics.sh --delete --zookeeper 192.168.244.141:2181,192.168.244.142:2181,192.168.244.143:2181 --topic test01
詳情
./bin/kafka-topics.sh --describe --zookeeper 192.168.244.160:2181,192.168.244.161:2181,192.168.244.162:2181 --topic test01
修改分割槽數 --不能減少 只能增加
./bin/kafka-topics.sh --alter --zookeeper 192.168.244.141:2181,192.168.244.142:2181,192.168.244.143:2181 --partition 5 --topic test03
delete.topic.enable=true如果為false的話 刪除一個test01 ,需要去zk下面刪除一系列附帶的資訊。然後還要刪除節點上面的3個備份數,
刪除zookeeper資訊:
刪除節點上的資料
API
啟動生產者
./bin/kafka-console-producer.sh --broker-list 192.168.244.160:9092,192.168.244.161:9092,192.138.244.162:9092 --topic test01
啟動消費者
./bin/kafka-console-consumer.sh --zookeeper 192.168.244.160:2181,192.168.244.161:2181,192.138.244.162:2181 --topic test01 --from-beginning