Spark Streaming使用Kafka作為資料來源
由於kafka在0.8和1.0之間引入了新版本的Consumer API,所以這兒有兩個獨立的整合方案。0.8版本的整合方案可以相容0.9或1.0的kafka,而1.0版本的整合方案則不可以相容1.0以下版本的kafka。
---------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------
1、搭建kafka環境:見本人部落格 kafak介紹及部署。
2、將kafka與spark做整合,有兩種方式,以下分別作介紹。
-----------------------------------------------------------------------------------------------------------------
一、基於Receiver的方法(Receiver-based Approach)
使用一個Receiver來接收資料,這個Receiver實現了kafka的高級別Consumer API,Receiver從Kafka這兒接收資料並儲存在Spark的Executor中,作業執行後由Spark Streaming來處理資料。但是,在預設的配置下當任務失敗時這種方式會丟失資料(可參考
注意點:
A。kafka的topic中的分割槽與RDD中的分割槽沒有關係。如果你增大KafkaUtils.createStream()的第四個引數的值,只是增大了Receiver的Topic使用的執行緒數,並沒有增加Spark處理資料的並行度。
B。可以用不同的Group和Topic建立多個kafka input DStream,以便使用多個Receiver並行接收資料。
C。如果你啟用了分散式檔案系統的Write Ahead Logs機制,接收到的資料就以多副本的方式存在log中。則Input DStream則可以採用StorageLevel.MEMORY_AND_DISK_SER方式(that is, use KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。
1、連線
新增依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.1</version>
</dependency>
2、程式設計
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
3、具體程式碼
package com.ruozedata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaWordCountApp {
def main(args: Array[String]) {
if(args.length != 4) {
System.err.println("Usage: KafkaWordCountApp <zkQuorum> <groupId> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum,groupId, topics, numThreads) = args
val sparkConf = new SparkConf()
.setAppName("SocketWordCountApp")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
messages.map(_._2) // 是我們的value
.flatMap(_.split(",")).map((_,1))
.reduceByKey(_+_)
.print()
ssc.start()
ssc.awaitTermination()
}
}
4、傳參
5、此時執行程式,在zk的生產者中輸入資料回車,資料能夠打印出來
-------------------------------------------------------------------------------------------------------------------------------
二、沒有Receiver的直接方式(Direct Approach (No Receivers)
這種方式在spark1.3版本引入,這種方式會週期性地查詢kafka的分割槽和topic的offset,這樣可以定義每個批次中要處理資料的offset範圍。當作業提交執行時,kafka 的Consumer API會從kafka讀取定義的offset範圍(和從File System讀取資料一樣)。
這種方式比起基於Receiver有以下的優點:
A。簡化了並行度:這種方式不需要你建立多個Input DStream,然後再執行union操作。因為directStream這個方法,Spark Streaming將會建立和Kafka分割槽數一樣多的分割槽數,使得Spark Streaming可以並行地從kafka讀取資料。這種kafka 和RDD之間一對一的對映關係,便於理解和調優。
##若要增加Spark Streaming讀取kakfa資料的並行度,調整kafka的分割槽數即可。調優中十分重要的一個點。
B。高效:在第一種方式中若要實現資料的零丟失必須引入副本性質的Write Ahead Logs,這是十分低效的。第二種方式的話因為沒有Receiver,所以也就不用引入Write Ahead Logs機制。只要你設定了kafka的log.retention.hours引數,你就可以從kafka中恢復資料。
C。精準一次的消費語義:第一種方式是使用kafka高級別API進行消費,然後將offset儲存在zookeeper上,這種方式結合Write Ahead Logs在至少一次地消費語義(資料被消費後未來得及將offset寫入zk,driver重啟後需要再次去Write Ahead Logs中消費)下是能夠保證資料的零丟失的,但在任務失敗的情況下部分資料可能被消費兩次。第二種方式的offset並沒有使用zk來儲存,而是由Spark Streaming的checkpoint 來跟蹤,這種方式就算任務失敗也只會被消費一次。
缺點:由於offset並不是儲存在zk上,當使用基於zk的kafka監控工具時,任務情況無法展示在UI上。但是可以在每一個批次後訪問offset並更新zk解決這個問題。
1、連線
新增依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.1</version>
</dependency>
2、程式設計
import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
3、具體程式碼
package com.ruozedata.streaming
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectKafkaWordCountApp {
def main(args: Array[String]) {
if(args.length != 2) {
System.err.println("Usage: DirectKafkaWordCountApp <topics> <brokers>")
System.exit(1)
}
val Array(topics, brokers) = args
val sparkConf = new SparkConf()
.setAppName("DirectKafkaWordCountApp")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list"-> brokers)
val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,
kafkaParams,
topicSet
)
messages.map(_._2) // 是我們的value
.flatMap(_.split(",")).map((_,1))
.reduceByKey(_+_)
.print
ssc.start()
ssc.awaitTermination()
}
}
4、傳參
5、此時執行程式,在zk的生產者中輸入資料回車,資料能夠打印出來
-----------------------------------------------------------------------------------------------------------------------------
UI地址:ip:4040/streaming
四、優化
注意點:
A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e.
spark.cores.max
should take the receiver slots into account. The receivers are allocated to executors in a round robin fashion.When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing.
An RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally.
The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in. Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait
increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally.Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling
inputDstream.repartition(n)
. This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. An RDD’s processing is scheduled by driver’s jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued.If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted.
If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration
spark.streaming.receiver.maxRate
, rate of receiver can be limited.