1. 程式人生 > >spark筆記之Spark Streaming整合kafka實戰

spark筆記之Spark Streaming整合kafka實戰

kafka作為一個實時的分散式訊息佇列,實時的生產和消費訊息,這裡我們可以利用SparkStreaming實時地讀取kafka中的資料,然後進行相關計算。

在Spark1.3版本後,KafkaUtils裡面提供了兩個建立dstream的方法,一種為KafkaUtils.createDstream,另一種為KafkaUtils.createDirectStream。

7.1 KafkaUtils.createDstream方式

KafkaUtils.createDstream(ssc, [zk], [group id], [per-topic,partitions] ) 使用了receivers接收器來接收資料,利用的是Kafka高層次的消費者api,對於所有的receivers接收到的資料將會儲存在

Sparkexecutors中,然後通過Spark Streaming啟動job來處理這些資料,預設會丟失,可啟用WAL日誌,它同步將接受到資料儲存到分散式檔案系統上比如HDFS。 所以資料在出錯的情況下可以恢復出來 。

A、建立一個receiver接收器來對kafka進行定時拉取資料,這裡產生的dstream中rdd分割槽和kafka的topic分割槽不是一個概念,故如果增加特定主體分割槽數僅僅是增加一個receiver中消費topic的執行緒數,並沒有增加spark的並行處理的資料量。 B、對於不同的group和topic可以使用多個receivers建立不同的DStream  C、如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)

同時需要設定儲存級別(預設StorageLevel.MEMORY_AND_DISK_SER_2),

7.1.1 KafkaUtils.createDstream實戰

(1)新增kafka的pom依賴

[AppleScript] 純文字檢視 複製程式碼

?

1

2

3

4

5

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId

>spark-streaming-kafka_0-8_2.11</artifactId>

<version>2.0.2</version>

</dependency>

(2)啟動zookeeper叢集

zkServer.sh start

(3)啟動kafka叢集

kafka-server-start.sh  /export/servers/kafka/config/server.properties

(4) 建立topic

kafka-topics.sh --create --zookeeper hdp-node-01:2181 --replication-factor 1 --partitions 3 --topic kafka_spark

(5) 向topic中生產資料

通過shell命令向topic傳送訊息

kafka-console-producer.sh --broker-list hdp-node-01:9092 --topic  kafka_spark

(6)編寫Spark Streaming應用程式

[AppleScript] 純文字檢視 複製程式碼

?

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

package cn.itcast.dstream.kafka

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.kafka.KafkaUtils

import scala.collection.immutable

//todo:利用sparkStreaming對接kafka實現單詞計數----採用receiver(高階API)

object SparkStreamingKafka_Receiver {

def main(args: Array[String]): Unit = {

//1、建立sparkConf

val sparkConf: SparkConf = new SparkConf()

.setAppName("SparkStreamingKafka_Receiver")

.setMaster("local[4]")

.set("spark.streaming.receiver.writeAheadLog.enable","true") //開啟wal預寫日誌,儲存資料來源的可靠性

//2、建立sparkContext

val sc = new SparkContext(sparkConf)

sc.setLogLevel("WARN")

//3、建立StreamingContext

val ssc = new StreamingContext(sc,Seconds(5))

//設定checkpoint

ssc.checkpoint("./Kafka_Receiver")

//4、定義zk地址

val zkQuorum="node1:2181,node2:2181,node3:2181"

//5、定義消費者組

val groupId="spark_receiver1"

//6、定義topic相關資訊 Map[String, Int]

// 這裡的value並不是topic分割槽數,它表示的topic中每一個分割槽被N個執行緒消費

val topics=Map("spark_kafka" -> 2)

//7、通過KafkaUtils.createStream對接kafka

//這個時候相當於同時開啟3個receiver接受資料

val receiverDstream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {

val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)

stream

}

)

//使用ssc.union方法合併所有的receiver中的資料

val unionDStream: DStream[(String, String)] = ssc.union(receiverDstream)

//8、獲取topic中的資料

val topicData: DStream[String] = unionDStream.map(_._2)

//9、切分每一行,每個單詞計為1

val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))

//10、相同單詞出現的次數累加

val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)

//11、列印輸出

result.print()

//開啟計算

ssc.start()

ssc.awaitTermination()

}

}

(7)執行程式碼,檢視控制檯結果資料

總結:

通過這種方式實現,剛開始的時候系統正常執行,沒有發現問題,但是如果系統異常重新啟動sparkstreaming程式後,發現程式會重複處理已經處理過的資料,這種基於receiver的方式,是使用Kafka的高階API,topic的offset偏移量在ZooKeeper中。這是消費Kafka資料的傳統方式。這種方式配合著WAL機制可以保證資料零丟失的高可靠性,但是卻無法保證資料只被處理一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。官方現在也已經不推薦這種整合方式,我們使用官網推薦的第二種方式kafkaUtils的createDirectStream()方式。

7.KafkaUtils.createDirectStream方式

這種方式不同於Receiver接收資料,它定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量範圍在每個batch裡面處理資料,Spark通過呼叫kafka簡單的消費者Api(低階api)讀取一定範圍的資料。   相比基於Receiver方式有幾個優點:  A、簡化並行

不需要建立多個kafka輸入流,然後union它們,sparkStreaming將會建立和kafka分割槽數相同的rdd的分割槽數,而且會從kafka中並行讀取資料,spark中RDD的分割槽數和kafka中的topic分割槽數是一一對應的關係。

B、高效,       

第一種實現資料的零丟失是將資料預先儲存在WAL中,會複製一遍資料,會導致資料被拷貝兩次,第一次是接受kafka中topic的資料,另一次是寫到WAL中。而沒有receiver的這種方式消除了這個問題。  C、恰好一次語義(Exactly-once-semantics)

Receiver讀取kafka資料是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過資料儲存在WAL中保證資料不丟失,但是可能會因為sparkStreaming和ZK中儲存的偏移量不一致而導致資料被消費了多次。EOS通過實現kafka低層次api,偏移量僅僅被ssc儲存在checkpoint中,消除了zk和ssc偏移量不一致的問題。缺點是無法使用基於zookeeper的kafka監控工具。

7.2.1 KafkaUtils.createDirectStream實戰

[AppleScript] 純文字檢視 複製程式碼

?

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

package cn.itcast.dstream.kafka

import kafka.serializer.StringDecoder

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka.KafkaUtils

//todo:利用sparkStreaming對接kafka實現單詞計數----採用Direct(低階API)

object SparkStreamingKafka_Direct {

def main(args: Array[String]): Unit = {

//1、建立sparkConf

val sparkConf: SparkConf = new SparkConf()

.setAppName("SparkStreamingKafka_Direct")

.setMaster("local[2]")

//2、建立sparkContext

val sc = new SparkContext(sparkConf)

sc.setLogLevel("WARN")

//3、建立StreamingContext

val ssc = new StreamingContext(sc,Seconds(5))

ssc.checkpoint("./Kafka_Direct")

//4、配置kafka相關引數

val kafkaParams=Map("metadata.broker.list"->"node1:9092,node2:9092,node3:9092","group.id"->"Kafka_Direct")

//5、定義topic

val topics=Set("spark01")

//6、通過 KafkaUtils.createDirectStream接受kafka資料,這裡採用是kafka低階api偏移量不受zk管理

val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

//7、獲取kafka中topic中的資料

val topicData: DStream[String] = dstream.map(_._2)

//8、切分每一行,每個單詞計為1

val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))

//9、相同單詞出現的次數累加

val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)

//10、列印輸出

result.print()

//開啟計算

ssc.start()

ssc.awaitTermination()

}

}

(2)檢視對應的效果

向topic中新增資料

檢視控制檯的輸出: