kafka簡單使用
阿新 • • 發佈:2018-11-10
● kafka的一些概念 :
Broker : 安裝Kafka服務的那臺叢集就是一個broker(broker的id要全域性唯一)
Producer :訊息的生產者,負責將資料寫入到broker中(push)
Consumer:訊息的消費者,負責從kafka中讀取資料(pull),老版本的消費者需要依賴zk,新版本的不需要
Topic: 主題,相當於是資料的一個分類,不同topic存放不同的資料
Consumer Group: 消費者組,一個topic可以有多個消費者同時消費,多個消費者如果在一個消費者組中,那麼他們不能重複消費資料
● 建立topic
#replication : 副本數量 , partitions : 分割槽
[ [email protected] apps]# ./kafka_2.11-0.8.2.2/bin/kafka-topics.sh --create --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181 --replication-factor 3 --partitions 3 --topic ws
● 列出所有topic
[[email protected] apps]# ./kafka_2.11-0.8.2.2/bin/kafka-topics.sh --list --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181 ws
● 往Kafka的topic中寫入資料(命令列的生成者)
# -topic ws : ws 生產的主題
[[email protected] apps]#./kafka_2.11-0.8.2.2/bin/kafka-console-producer.sh --broker-list hadoop-01:9092,hadoop-02:9092,hadoop-03:9092 -topic ws
● 啟動消費者
#舊的topic不會消費,只會消費當前producer新產生的
[[email protected] kafka_2.11-0.8.2.2]# ./bin/kafka-console-consumer.sh --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181 -topic ws
#從頭開始消費
[ [email protected] kafka_2.11-0.8.2.2]# ./bin/kafka-console-consumer.sh --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181 -topic ws --from-beginning
● scala編寫producer
package com.ws
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
/**
* kafka生產者
*/
object ProducerDemo {
def main(args: Array[String]): Unit = {
val prop = new Properties()
prop.put("metadata.broker.list","hadoop-01:9092,hadoop-02:9092,hadoop-03:9092")
prop.setProperty("serializer.class","kafka.serializer.StringEncoder")
val config = new ProducerConfig(prop)
val producer = new Producer[String,String](config)
for (i <- 1 to 1000){
//引數1:指定topic ,引數2:寫入的資料
producer.send(new KeyedMessage[String,String]("ws","hello scala "+i))
}
}
}