1. 程式人生 > >kafka簡單使用

kafka簡單使用

● kafka的一些概念 :

	Broker : 安裝Kafka服務的那臺叢集就是一個broker(broker的id要全域性唯一)
	Producer :訊息的生產者,負責將資料寫入到broker中(push)
	Consumer:訊息的消費者,負責從kafka中讀取資料(pull),老版本的消費者需要依賴zk,新版本的不需要
	Topic: 主題,相當於是資料的一個分類,不同topic存放不同的資料
	Consumer Group: 消費者組,一個topic可以有多個消費者同時消費,多個消費者如果在一個消費者組中,那麼他們不能重複消費資料

● 建立topic

#replication : 副本數量 , partitions : 分割槽
[
[email protected]
apps]# ./kafka_2.11-0.8.2.2/bin/kafka-topics.sh --create --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181 --replication-factor 3 --partitions 3 --topic ws

● 列出所有topic

[[email protected] apps]# ./kafka_2.11-0.8.2.2/bin/kafka-topics.sh --list --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181
ws

● 往Kafka的topic中寫入資料(命令列的生成者)

# -topic ws : ws 生產的主題
[[email protected] apps]#./kafka_2.11-0.8.2.2/bin/kafka-console-producer.sh --broker-list hadoop-01:9092,hadoop-02:9092,hadoop-03:9092 -topic ws

● 啟動消費者

#舊的topic不會消費,只會消費當前producer新產生的
[[email protected] kafka_2.11-0.8.2.2]# ./bin/kafka-console-consumer.sh --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181 -topic ws

#從頭開始消費
[
[email protected]
kafka_2.11-0.8.2.2]# ./bin/kafka-console-consumer.sh --zookeeper hadoop-01:2181,hadoop-02:2181,hadoop-03:2181 -topic ws --from-beginning

● scala編寫producer

package com.ws
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
/**
  * kafka生產者
  */
object ProducerDemo {

  def main(args: Array[String]): Unit = {
    val prop = new Properties()
    prop.put("metadata.broker.list","hadoop-01:9092,hadoop-02:9092,hadoop-03:9092")
    prop.setProperty("serializer.class","kafka.serializer.StringEncoder")

    val config = new ProducerConfig(prop)

    val producer = new Producer[String,String](config)

    for (i <- 1 to 1000){
      //引數1:指定topic  ,引數2:寫入的資料
      producer.send(new KeyedMessage[String,String]("ws","hello scala "+i))
    }
  }
}