kafka基礎使用方法(java)
阿新 • • 發佈:2019-01-09
操作步驟:
建立Topic
$ cd /opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18 $ bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
分析:雖然kafka對外開放的api中沒有建立topic的方法,但是實驗證明,當producer傳送一個新topic訊息到broker時會自動建立一個對應的topic。
不過自動建立的topic使用的是預設配置,若有需要還需手動修改配置。傳送資料到kafka
$ bin/kafka-console-producer --broker-list xxx.xxx.xxx.xxx:9092 --topic test
(紅框內的訊息需要手動輸入)
分析:kafka預設通過9092埠與producer和consumer進行資料互動。正式使用時該步驟由producer端的指令碼替代。建立Consumer消費資料
$ bin/kafka-console-consumer --zookeeper xxx.xxx.xxx.xxx:2181--bootstrap-server xxx.xxx.xxx.xxx:9092 --topic test --from-beginning
分析:成功讀出test中的資料。正式使用時該步驟由consumer端的指令碼替代。
問題彙總:
別用localhost!!!
問題再現:最開始按照官方文件的指引,使用一下命令來建立一個測試用的producer
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
結果出現上圖中的問題:
org.apache.kafka.common.errors.TimeoutException:
Failed to update metadata after 60000 ms.
對於該問題作出一下猜想:認為問題出在zookeeper對於某ip的許可權限制上面,換句話說官方文件中的demo應該是在單機環境中的,此時zookeeper與kafka在同一機器上,zookeeper對該機器localhost有許可權更新metadata,而在使用多節點叢集時,zookeeper為了防止傳輸出現偏差,禁止了localhost的相應許可權。
要驗證以上猜想,需要在單機環境下進行測試,沒時間做。再說吧。╮(╯_╰)╭
解決方法:改用特定ip即可,就這麼簡單,但是問題找了很久,一點辦法都木有。
總結: