1. 程式人生 > >讓Kafka在scala裏面跑起來

讓Kafka在scala裏面跑起來

時間 all tex 區分 系統變量 factor 命名 zoo.cfg topic

Kafka集群對消息的保存是根據Topic進行歸類的,由消息生產者(Producer)和消息消費者(Consumer)組成,另外,每一個Server稱為一個Broker(經紀人)。對於Kafka集群而言,Producer和Consumer都依賴於ZooKeeper來保證數據的一致性。

  在每條消息輸送到Kafka集群後,消息都會有一個Type,這個Type被稱為一個Topic,不同的Topic的消息是分開存儲的。每個Topic可以被分割為多個Partition,在每條消息中,它在文件中的位置稱為Offset,用於標記唯一一條消息。在Kafka中,消息被消費後,消息仍然會被保留一定時間後在刪除,比如在配置信息中,文件信息保留7天,那麽7天後,不管Kafka中的消息是否被消費,都會被刪除;以此來釋放磁盤空間,減少磁盤的IO消耗。

  在Kafka中,一個Topic的多個分區,被分布在Kafka集群的多個Server上,每個Server負責分區中消息的讀寫操作。另外,Kafka還可以配置分區需要備份的個數,以便提高可用行。由於用到ZK來協調,每個分區都有一個Server為Leader狀態,服務對外響應(如讀寫操作),若該Leader宕機,會由其他的Follower來選舉出新的Leader來保證集群的高可用性。

一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看作是一個FIFO( First Input First Output)的隊列。

kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,分區裏面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。

編譯報錯:WARN Selector: Error in I/O with localhost/127.0.0.1

java.io.EOFException

錯誤:https://issues.apache.org/jira/browse/KAFKA-3205

環境準備:

1. Java JDK (jdk1.8.0_112 64-bit)

安裝完成後添加系統變量:

JAVA_HOME= C:\Program Files\Java\jdk1.8.0_112

並在系統變量Path後添加 ;%JAVA_HOME%\bin;

2. Apache ZooKeeper

① 解壓進入目錄E:\Zookeeper\zookeeper-3.4.11\conf

② 將“zoo_sample.cfg”重命名為“zoo.cfg”

③ 打開“zoo.cfg”找到並編輯dataDir=E:\Zookeeper\zookeeper-3.4.11\data

④ 添加系統變量:ZOOKEEPER_HOME= E:\Zookeeper\zookeeper-3.4.11,並在系統變量Path後添加;%ZOOKEEPER_HOME%\bin;

⑤ 查看zoo.cfg文件clientPort默認值是否為2181

⑥ cmd中輸入zkServer ,如下圖則表示ZooKeeper啟動成功(窗口不要關閉)

3. Apache Kafka(請選擇Binary downloads)

① 壓進入目錄E:\kafka_2.12-1.0.0\config

② 打開server.properties並編輯log.dirs=E:\kafka_2.12-1.0.0\kafka-logs

③ 確認zookeeper.connect=localhost:2181

④ 進入目錄E:\kafka_2.12-1.0.0\並Shift+右鍵,選擇“打開命令窗口”,輸入:

.\bin\windows\kafka-server-start.bat .\config\server.properties

如果是第二次啟動,則需要進入目錄刪除文件夾kafka_2.12-1.0.0kafka-logs後重新運行命令。

代碼測試(scala maven項目):

1. pom.xml

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.5.1</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.9.0.1</version>

</dependency>

說明:kafka-clients不要使用0.8.2.1版本,因為它的poll函數直接返回了null

另外在0.9.0版本之後,consumer api不再區分high-level和low-level了。

2. producer.scala

import java.io.{File, FileInputStream}

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object producer {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf()

sparkConf.setAppName("kafka").setMaster("local[4]").set("SPARK_EXECUTOR_CORES","1")

val sc = new SparkContext(sparkConf)

val topic: RDD[String] = sc.textFile(s"F:/123456.txt")

val kafkaProducerCfg = "E:\\kafka_2.12-1.0.0\\config\\producer.properties"

val kafkaprop = new Properties()

kafkaprop.load(new FileInputStream(new File(kafkaProducerCfg)))

topic.repartition(1).foreachPartition((partisions: Iterator[String]) => {

val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](kafkaprop)

var pr = new ProducerRecord[String, String]("my-topic","my-key", "hi laosiji ")

producer.send(pr)

pr = new ProducerRecord[String, String]("finally","kafka", "可用了 ")

producer.send(pr)

partisions.foreach((line: String) => {

try {

val pr = new ProducerRecord[String, String]("my-topic","my-key", line)

producer.send(pr)

} catch {

case ex: Exception => println(ex.getMessage, ex)

}

})

producer.close()

})

}

}

3. consumer.scala

import java.io.{File, FileInputStream}

import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

object consumer {

def main(args: Array[String]): Unit = {

val kafkaProducerCfg = "E:\\kafka_2.12-1.0.0\\config\\consumer.properties"

val is = new FileInputStream(new File(kafkaProducerCfg))

val kafkaprop = new Properties()

// kafkaprop.load(is)

is.close()

kafkaprop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

kafkaprop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

// kafkaprop.put("partition.assignment.strategy", "range")

kafkaprop.put("group.id", "test")

kafkaprop.put("enable.auto.commit", "true")

kafkaprop.put("auto.commit.interval.ms", "1000")

kafkaprop.put("session.timeout.ms", "6000")

kafkaprop.put("bootstrap.servers","localhost:9092")

try{

val consumer: KafkaConsumer[String,String] = new KafkaConsumer[String,String](kafkaprop)

consumer.subscribe(java.util.Arrays.asList("my-topic","finally"))

while (true) {

val records = consumer.poll(1000)

var it = records.records("my-topic").iterator()

while (it.hasNext){

println(it.next())

}

it = records.records("finally").iterator()

while (it.hasNext){

println(it.next())

}

}

consumer.close()

} catch {

case e => e.printStackTrace()

}

}

}

4. 先啟動consumer,再啟動producer,可以在控制臺看到運行結果。

控制臺測試:

1. 進入目錄E:\kafka_2.12-1.0.0創建Topic

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 –topic HelloKafka

查看Topic:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

2. 進入目錄E:\kafka_2.12-1.0.0打開cmd創建生產者:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic HelloKafka

3. 進入目錄E:\kafka_2.12-1.0.0打開cmd創建消費者:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic HelloKafka --from-beginning

讓Kafka在scala裏面跑起來