必讀:再講Spark與kafka 0.8.2.1+整合
Kafka在0.8和0.10版本引入了新的消費者API,所以spark Streaming與kafka的整合提供了兩個包。 請根據你的叢集選用正確的包。注意, 0.8和後期的版本0.9及0.10是相容的,但是0.10整合是不相容之前的版本的。
包與版本特性之間的對應關係如下:
本文主要講述spark Streaming與kafka 0.8.2.1+版本整合,要求kafka叢集的版本是0.8.2.1或者更高版本。
基於Receiver的方式
這種方式使用一個Receiver來接受資料。Receiver是使用kafka的高階消費者API來實現的。所有的Receiver從kafka裡面接受資料,然後儲存於Executors,spark Streaming再生成任務來處理資料。
然而,預設配置的情況,這種方式在失敗的情況下有可能丟失資料,為了確保零資料丟失,可以配置預寫日誌(WAL,從spark1.2引入)。這會將Receiver接收到的資料寫入分散式檔案系統,如hdfs,所以所有的資料可以在從失敗恢復執行的時候載入到。
導包(MVN或者sbt):
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.1
測試程式碼如下:
val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topics = "topic1,topic2 1" val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination()
注意事項:
1,打包的時候 spark-streaming-kafka-0-8對應的jar包一定要帶上。
2,消費的kafka分割槽和生成的RDD分割槽並不是一一對應的。所以,增加KafkaUtils.createStream()命令中topic指定的分割槽,也即map裡面topic名字對應的value,只會增加消費該命令建立的Receiver的內部消費者執行緒數目,不會增加spark處理資料的並行度,恰當執行緒數會增加Receiver的接收資料的速度。
3,KafkaUtils.createStream()命令執行只會建立一個Receiver,我們可以結合消費的topic分割槽和group名稱來多建立幾個Receiver,來增加接收資料的並行度。
4,如果你啟動了預寫日誌,日誌儲存系統時hdfs,日誌已經會被儲存副本。所以,可以設定儲存等級為StorageLevel.MEMORY_AND_DISK_SER.
5,要配置該機制,首先要呼叫 StreamingContext 的 checkpoint ( ) 方法設定一個 checkpoint 目錄,然後需要將 spark.streaming.receiver.writeAheadLog.enable 引數設定為 true。
Direct Approach
在spark 1.3以後引入了一種新的spark Streaming api,新的api回自己在driver內部維護一個偏移,然後自動計算指定的topic+partition該批次需要拉去資料的範圍,然後從kafka拉去資料來計算。不同於基於Receiver的方式,direct模式不會將偏移記錄到Zookeeper,以保證故障恢復從上次偏移處消費訊息。Direct模式你可以通過Checkpoint或者自己編寫工具來實現偏移的維護,保證資料消費不丟失。
這種方式相比於基於Receiver的方式有以下優勢:
1, 簡化並行度:不需要建立多個kafka stream,然後union他們。使用directStream,spark streaming 生成的RDD分割槽和kafka的分割槽是一一對應的,這種方式理解起來更簡單而且便於調優。
2, 高效:基於Receiver的方式要保證資料不丟失,必須啟用預寫日誌。這個行為實際上是非常抵消的,資料會被複制兩次,一次是kafka叢集,一次是預寫日誌。Direct方式解決了這個問題,由於沒有Receiver,故而也不需要預寫日誌。只要你kafka裡面存有資料,那麼訊息就可以從kafka裡面恢復。
3, 僅一次消費語義:基於Receiver的會把偏移提交到Zookeeper。這種方式結合預寫日誌能保證資料不丟失,也即是最少一次消費語義,但是有機率導致消費者在存在失敗的情況下消費訊息兩次。比如,訊息處理並經過儲存之後,但是偏移並沒有提交到Zookeeper,這個時候發生故障了,那麼恢復之後,就會按照Zookeeper上的偏移再一次消費資料並處理,導致訊息重複處理。但是direct 方式偏移不會提交到Zookeeper,是spark streaming在driver使用記憶體變數加Checkpoint進行追蹤的,所以儘管會存在任務失敗,但是仍然能保證消費的一次處理。
注意,由於direct方式不會提交偏移到Zookeeper,所以,基於Zookeeper的kafka監控工具就不能監控到spark streaming的消費情況。然而,你可以自己講偏移提交道Zookeeper,來滿足你的需求。
導包(MVN或者sbt):
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.1
測試程式碼如下:
val Array(brokers, topics) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
關於自己編程式碼提交到Zookeeper,限於篇幅的原因,不在這裡囉嗦。
調優限速
現實系統中會有流量尖峰,比如淘寶的雙十一,那一秒鐘的流量,大的嚇人,假如有spark streaming處理的話,會有可能導致訊息不能及時處理,甚至出現故障,應對這種流量尖峰,spark streaming內部實現了一個控制器,基於PID,具體PID的概念是啥,請自行百度。
這裡只是想介紹兩個主要的引數:
基於Receiver的要配置的引數是spark.streaming.receiver.maxRate
基於direct的要配置的引數是spark.streaming.kafka.maxRatePerPartition
通過我們壓測我們的spark streaming任務每秒鐘最大消費處理的訊息數,然後使用這兩個引數限消費訊息的速率,來避免高峰期一批次消費過量訊息導致應用不正常執行。