1. 程式人生 > >kafka基礎使用方法(java)

kafka基礎使用方法(java)

操作步驟:

  1. 建立Topic

    $ cd /opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18 
    $ bin/kafka-topics --create
    --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    這裡寫圖片描述
    分析:雖然kafka對外開放的api中沒有建立topic的方法,但是實驗證明,當producer傳送一個新topic訊息到broker時會自動建立一個對應的topic。
    不過自動建立的topic使用的是預設配置,若有需要還需手動修改配置。

  2. 傳送資料到kafka

    $ bin/kafka-console-producer --broker-list xxx.xxx.xxx.xxx:9092 --topic test

    這裡寫圖片描述
    (紅框內的訊息需要手動輸入)
    分析:kafka預設通過9092埠與producer和consumer進行資料互動。正式使用時該步驟由producer端的指令碼替代。

  3. 建立Consumer消費資料

    $ bin/kafka-console-consumer --zookeeper xxx.xxx.xxx.xxx:2181--bootstrap-server xxx.xxx.xxx.xxx:9092 --topic
    test --from-beginning

    這裡寫圖片描述
    分析:成功讀出test中的資料。正式使用時該步驟由consumer端的指令碼替代。

問題彙總:

  1. 別用localhost!!!

    問題再現:最開始按照官方文件的指引,使用一下命令來建立一個測試用的producer

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

    這裡寫圖片描述
    結果出現上圖中的問題:
    org.apache.kafka.common.errors.TimeoutException:
    Failed to update metadata after 60000 ms.

    對於該問題作出一下猜想:認為問題出在zookeeper對於某ip的許可權限制上面,換句話說官方文件中的demo應該是在單機環境中的,此時zookeeper與kafka在同一機器上,zookeeper對該機器localhost有許可權更新metadata,而在使用多節點叢集時,zookeeper為了防止傳輸出現偏差,禁止了localhost的相應許可權。
    要驗證以上猜想,需要在單機環境下進行測試,沒時間做。再說吧。╮(╯_╰)╭
    解決方法:改用特定ip即可,就這麼簡單,但是問題找了很久,一點辦法都木有。
    總結:

    別用localhost!!有必要熟悉一下zookeeper的原理。