0x00 前言

本篇是Spark原始碼解析的第二篇,主要通過原始碼分析Spark Streaming設計中最重要的一個概念——DStream。

本篇主要來分析Spark Streaming中的Dstream,重要性不必多講,明白了Spark這個幾個資料結構,容易對Spark有一個整體的把握。

和RDD那篇文章類似,雖說是分析Dstream,但是整篇文章會圍繞著一個具體的例子來展開。算是對Spark Streaming原始碼的一個概覽。


  • Spark Streaming的一些概念,主要和Dstream相關
  • Dstream的整體設計
  • 通過一個具體例子深入講解

0x01 概念

什麼是Spark Streaming

Scalable, high-throughput, fault-tolerant stream processing of live data streams!


提一點就是,Streaming 的任務最後都會轉化為Spark任務,由Spark引擎來執行。


It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.

RDD 的定義是一個只讀、分割槽的資料集(an RDD is a read-only, partitioned collection of records),而 DStream 又是 RDD 的模板,所以我們把 Dstream 也視同資料集。


Spark Streaming和其它實時處理程式的區別


我們把實時處理框架分為兩種:Record-at-a-time和D-Stream processing model。


D-Stream processing model:


Record-at-a-time processing model. Each node continuously receives records, updates internal state, and sends new records. Fault tolerance is typically achieved through replication, using a synchronization protocol like Flux or DPC to ensure that replicas of each node see records in the same order (e.g., when they have multiple parent nodes).

D-Stream processing model. In each time interval, the records that arrive are stored reliably across the cluster to form an immutable, partitioned dataset. This is then processed via deterministic parallel operations to compute other distributed datasets that represent program output or state to pass to the next interval. Each series of datasets forms one D-Stream.


In a record-at-a-time system, the major recovery challenge is rebuilding the state of a lost, or slow, node.

0x02 原始碼分析


A DStream internally is characterized by a few basic properties:

  • A list of other DStreams that the DStream depends on
  • A time interval at which the DStream generates an RDD
  • A function that is used to generate an RDD after each time interval


  • 父依賴
  • 生成RDD的時間間隔
  • 一個生成RDD的function

這些對應到程式碼中的話如下,這些都會有具體的子類來實現,我們在後面的分析中就能看到。 下面先順著例子一點點講。

abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging {
  /** Time interval after which the DStream generates an RDD */
  def slideDuration: Duration
  /** List of parent DStreams on which this DStream depends on */
  def dependencies: List[DStream[_]]
  /** Method that generates an RDD for the given time */
  def compute(validTime: Time): Option[RDD[T]]
  // RDDs generated, marked as private[streaming] so that testsuites can access it
  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
  // Reference to whole DStream graph
  private[streaming] var graph: DStreamGraph = null



val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
ssc.start()             // Start the computation
ssc.awaitTermination()  //





1. 原始碼分析:StreamingContext


  • JobScheduler : 用於定期生成Spark Job
  • JobGenerator
  • JobExecutor
  • DstreamGraph:包含Dstream之間依賴關係的容器
  • StreamingJobProgressListener:監聽Streaming Job,更新StreamingTab
  • StreamingTab:Streaming Job的標籤頁
  • SparkUI負責展示
class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {...}

先看第一行程式碼做了什麼,val lines = ssc.socketTextStream("localhost", 9999),看過RDD原始碼的應該會記得,這一行程式碼就會做很多Dstream的轉換,下面我們慢慢看。

socketTextStream 返回的時一個SocketInputDStream,那麼SocketInputDStream是個什麼東西?

  def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)

  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

2. 原始碼分析:SocketInputDStream



class SocketInputDStream[T: ClassTag](
    _ssc: StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)

3. 原始碼分析:ReceiverInputDStream

ReceiverInputDStream是一個比較重要的類,有很大一部分的Dstream都繼承於它。 比如說Kafka的InputDStream。所以說這是一個比較關鍵的類。

Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] that has to start a receiver on worker nodes to receive external data.
Specific implementations of ReceiverInputDStream must define [[getReceiver]] function that gets the receiver object of type [[org.apache.spark.streaming.receiver.Receiver]] that will be sent to the workers to receive data.

注意: 這裡重寫了一個重要的方法compute。它決定了如何生成RDD。


abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
  extends InputDStream[T](_ssc) {
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)

4. 原始碼分析:InputDStream


This is the abstract base class for all input streams. This class provides methods start() and stop() which are called by Spark Streaming system to start and stop receiving data, respectively.

Input streams that can generate RDDs from new data by running a service/thread only on the driver node (that is, without running a receiver on worker nodes), can be implemented by directly inheriting this InputDStream.

For example, FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for new files and generates RDDs with the new files.

For implementing input streams that requires running a receiver on the worker nodes, use [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.

abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) {
override def dependencies: List[DStream[_]] = List()

  override def slideDuration: Duration = {
    if (ssc == null) throw new Exception("ssc is null")
    if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")

注意: 到這裡,才看完了第一行程式碼,就是那個讀資料的那一行。

5. 原始碼分析:Dstream.flatMap方法(以及Dstream如何生成RDD)



def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))


class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[U]] = {


Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it.

DStream 內部用一個型別是 HashMap 的變數 generatedRDD 來記錄已經生成過的 RDD。

注意: compute(time)是用來生成rdd的。

  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    / 從 generatedRDDs 裡 來取rdd:如果有 rdd 就返回,沒有 rdd 就進行 orElse 的程式碼
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      // 驗證time是否valid
      if (isTimeValid(time)) {
        // 此處呼叫 compute(time) 方法獲得 rdd 例項,並存入 rddOption 變數
        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          // 這個函式在RDD的程式碼裡面,看了一下不是很理解,只能通過註釋知道大概意思是不檢查輸出目錄。
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          // 將剛剛例項化出來的 rddOption 放入 generatedRDDs 對應的 time 位置
          generatedRDDs.put(time, newRDD)
      } else {

6. 原始碼分析:Dstream.map方法

/** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))



class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {

7. 原始碼分析:reduceByKey方法


Return a new DStream by applying reduceByKey to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.

  def reduceByKey(
      reduceFunc: (V, V) => V,
      partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
    combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner)

Combine elements of each key in DStream’s RDDs using custom functions. This is similar to the combineByKey for RDDs.



  def combineByKey[C: ClassTag](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiner: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
    val cleanedCreateCombiner = sparkContext.clean(createCombiner)
    val cleanedMergeValue = sparkContext.clean(mergeValue)
    val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
    new ShuffledDStream[K, V, C](

8. 原始碼分析:DStream.print方法



  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println(s"Time: $time")
        if (firstNum.length > num) println("...")
        // scalastyle:on println
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)



An internal DStream used to represent output operations like DStream.foreachRDD.

class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        Some(new Job(time, jobFunc))
      case None => None

0x03 總結



2017-05-25 23:16:00 lkds

文章可以轉載, 但必須以超連結形式標明文章原始出處和作者資訊