1. 程式人生 > >Spark原始碼解析:DStream

Spark原始碼解析:DStream

0x00 前言

本篇是Spark原始碼解析的第二篇,主要通過原始碼分析Spark Streaming設計中最重要的一個概念——DStream。

本篇主要來分析Spark Streaming中的Dstream,重要性不必多講,明白了Spark這個幾個資料結構,容易對Spark有一個整體的把握。

和RDD那篇文章類似,雖說是分析Dstream,但是整篇文章會圍繞著一個具體的例子來展開。算是對Spark Streaming原始碼的一個概覽。

文章結構

  • Spark Streaming的一些概念,主要和Dstream相關
  • Dstream的整體設計
  • 通過一個具體例子深入講解

0x01 概念

什麼是Spark Streaming

Scalable, high-throughput, fault-tolerant stream processing of live data streams!

一個實時系統,或者說是準實時系統。詳細不再描述。

提一點就是,Streaming 的任務最後都會轉化為Spark任務,由Spark引擎來執行。

Dstream

It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.

RDD 的定義是一個只讀、分割槽的資料集(an RDD is a read-only, partitioned collection of records),而 DStream 又是 RDD 的模板,所以我們把 Dstream 也視同資料集。

我的簡單理解,Dstream是在RDD上面又封了一層的資料結構。下面是官網對Dstream描述的圖。

Spark Streaming和其它實時處理程式的區別

此處是來自Spark作者的論文,寫的很好,我就不翻譯了,摘出來我關注的點。

我們把實時處理框架分為兩種:Record-at-a-time和D-Stream processing model。

Record-at-a-time:

D-Stream processing model:

兩者的區別:

Record-at-a-time processing model. Each node continuously receives records, updates internal state, and sends new records. Fault tolerance is typically achieved through replication, using a synchronization protocol like Flux or DPC to ensure that replicas of each node see records in the same order (e.g., when they have multiple parent nodes).

D-Stream processing model. In each time interval, the records that arrive are stored reliably across the cluster to form an immutable, partitioned dataset. This is then processed via deterministic parallel operations to compute other distributed datasets that represent program output or state to pass to the next interval. Each series of datasets forms one D-Stream.

Record-at-a-time的問題:

In a record-at-a-time system, the major recovery challenge is rebuilding the state of a lost, or slow, node.

0x02 原始碼分析

Dstream

A DStream internally is characterized by a few basic properties:

  • A list of other DStreams that the DStream depends on
  • A time interval at which the DStream generates an RDD
  • A function that is used to generate an RDD after each time interval

Dstream這個資料結構有三塊比較重要。

  • 父依賴
  • 生成RDD的時間間隔
  • 一個生成RDD的function

這些對應到程式碼中的話如下,這些都會有具體的子類來實現,我們在後面的分析中就能看到。 下面先順著例子一點點講。

abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging {
  /** Time interval after which the DStream generates an RDD */
  def slideDuration: Duration
  /** List of parent DStreams on which this DStream depends on */
  def dependencies: List[DStream[_]]
  /** Method that generates an RDD for the given time */
  def compute(validTime: Time): Option[RDD[T]]
  // RDDs generated, marked as private[streaming] so that testsuites can access it
  @transient
  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
  // Reference to whole DStream graph
  private[streaming] var graph: DStreamGraph = null
 }

舉個栗子

官網最基本的wordcount例子,和Spark的類似。雖簡單,但是代表性很強。

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  //

這裡涉及到了Dstream之間的轉換以及RDD的生成。在這裡先看一下Dstream的轉換。

Dstream依賴關係

Dstream的一些依賴關係還是要先弄明白的,不然不太容易理解。Dstream依賴圖很大,我們只列幾個這次關注的。

這裡不再詳細介紹每一個元件,只放一個圖,後面在看原始碼的時候可以回過頭再看,會更清晰。

1. 原始碼分析:StreamingContext

StreamingContext的主要組成,這裡我們不再展開講StreamingContext的作用,我們先講這個具體的例子,後面會有專門的部落格來分析其中一些主要的元件,比如DstreamGraph和JobGenerator。

  • JobScheduler : 用於定期生成Spark Job
  • JobGenerator
  • JobExecutor
  • DstreamGraph:包含Dstream之間依賴關係的容器
  • StreamingJobProgressListener:監聽Streaming Job,更新StreamingTab
  • StreamingTab:Streaming Job的標籤頁
  • SparkUI負責展示
class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {...}

先看第一行程式碼做了什麼,val lines = ssc.socketTextStream("localhost", 9999),看過RDD原始碼的應該會記得,這一行程式碼就會做很多Dstream的轉換,下面我們慢慢看。

socketTextStream 返回的時一個SocketInputDStream,那麼SocketInputDStream是個什麼東西?

  def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  }

  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }

2. 原始碼分析:SocketInputDStream

這裡我們看到SocketInputDStream其實繼承了ReceiverInputDStream,這裡就出現了第一層的繼承關係,可以回頭看一下前面的那個圖。

它裡面沒做太多的東西,主要自己寫了一個SocketReceiver,其餘的主要方法都繼承自ReceiverInputDStream。

class SocketInputDStream[T: ClassTag](
    _ssc: StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

3. 原始碼分析:ReceiverInputDStream

ReceiverInputDStream是一個比較重要的類,有很大一部分的Dstream都繼承於它。 比如說Kafka的InputDStream。所以說這是一個比較關鍵的類。

Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] that has to start a receiver on worker nodes to receive external data.
Specific implementations of ReceiverInputDStream must define [[getReceiver]] function that gets the receiver object of type [[org.apache.spark.streaming.receiver.Receiver]] that will be sent to the workers to receive data.

注意: 這裡重寫了一個重要的方法compute。它決定了如何生成RDD。

另外ReceiverInputDStream繼承自InputDStream。

abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
  extends InputDStream[T](_ssc) {
  /**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

4. 原始碼分析:InputDStream

InputDStream是一個比較重要的抽象,它是所有和Input相關Dstream的抽象類。比如FileInputDStream和我們剛才看的ReceiverInputDStream。

This is the abstract base class for all input streams. This class provides methods start() and stop() which are called by Spark Streaming system to start and stop receiving data, respectively.

Input streams that can generate RDDs from new data by running a service/thread only on the driver node (that is, without running a receiver on worker nodes), can be implemented by directly inheriting this InputDStream.

For example, FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for new files and generates RDDs with the new files.

For implementing input streams that requires running a receiver on the worker nodes, use [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.

abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) {
override def dependencies: List[DStream[_]] = List()

  override def slideDuration: Duration = {
    if (ssc == null) throw new Exception("ssc is null")
    if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
    ssc.graph.batchDuration
  }
...
}

注意: 到這裡,才看完了第一行程式碼,就是那個讀資料的那一行。

5. 原始碼分析:Dstream.flatMap方法(以及Dstream如何生成RDD)

Dstream前面已經做過了一些介紹,不再贅述,這裡開始按照例子的順序向下講。

看我們的第一個轉換flatMap。返回了個FlatMappedDStream,並傳入一個function。

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

下面轉到FlatMappedDStream的分析,裡面會設計到如何生存RDD的操作。

class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }
}

DStream如何生成RDD?

Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it.

DStream 內部用一個型別是 HashMap 的變數 generatedRDD 來記錄已經生成過的 RDD。

注意: compute(time)是用來生成rdd的。

  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    / 從 generatedRDDs 裡 來取rdd:如果有 rdd 就返回,沒有 rdd 就進行 orElse 的程式碼
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      // 驗證time是否valid
      if (isTimeValid(time)) {
        // 此處呼叫 compute(time) 方法獲得 rdd 例項,並存入 rddOption 變數
        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          // 這個函式在RDD的程式碼裡面,看了一下不是很理解,只能通過註釋知道大概意思是不檢查輸出目錄。
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          // 將剛剛例項化出來的 rddOption 放入 generatedRDDs 對應的 time 位置
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

6. 原始碼分析:Dstream.map方法

/** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }

此處值得說明一下,看compute函式parent.getOrCompute(validTime).map(_.map[U](mapFunc)),在這裡同樣呼叫了Dstream的getOrCompute函式,由於validTime已經存在,因此不重新生成RDD,而是從generatedRDDs中取出來。

然後再執行.map(_.map[U](mapFunc))這部分。

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

7. 原始碼分析:reduceByKey方法

有了看RDD原始碼的經驗,我們很容易找到reduceByKey是在PairDStreamFunctions類中的。下面看一下它的原始碼。

Return a new DStream by applying reduceByKey to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.

  def reduceByKey(
      reduceFunc: (V, V) => V,
      partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
    combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner)
  }

Combine elements of each key in DStream’s RDDs using custom functions. This is similar to the combineByKey for RDDs.

此處,我們彷彿看到了套路,感覺和RDD的設計何其的一致。

這裡來了一個ShuffledDStream,具體的Shuffle過程可能會有一點小複雜,暫時不講,關於shuffle的內容需要再詳細地理解一下。

  def combineByKey[C: ClassTag](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiner: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
    val cleanedCreateCombiner = sparkContext.clean(createCombiner)
    val cleanedMergeValue = sparkContext.clean(mergeValue)
    val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
    new ShuffledDStream[K, V, C](
      self,
      cleanedCreateCombiner,
      cleanedMergeValue,
      cleanedMergeCombiner,
      partitioner,
      mapSideCombine)
  }

8. 原始碼分析:DStream.print方法

最後的列印函式也有點意思,它呼叫的時Dstream的print函式。

firstNum.take(num).foreach(println)這一句,打印出了rdd的內容。

   */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

然後呢?

我又發現了一個新的Dstream:ForEachDStream。按照註釋來講,上面的print的操作應該生成的時一個ForEachDStream不過,沒找到程式碼。只能暫時擱置。

An internal DStream used to represent output operations like DStream.foreachRDD.

class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

0x03 總結

至此,分析完了Dstream的相關原始碼,這篇和RDD那篇相對來講都比較基礎,主要是對整個流程的梳理,後續會對一些細節的點進行分析。

參考

2017-05-25 23:16:00 lkds

個人主頁:http://dantezhao.com
文章可以轉載, 但必須以超連結形式標明文章原始出處和作者資訊