1. 程式人生 > >Spark Streaming基礎原理

Spark Streaming基礎原理

What is Spark Streaming
作為UC Berkeley雲端計算software stack的一部分,Spark Streaming是建立在Spark上的應用框架,利用Spark的底層框架作為其執行基礎,並在其上構建了DStream的行為抽象。利用DStream所提供的api,使用者可以在資料流上實時進行count,join,aggregate等操作。

  1. A Spark Streaming application is very similar to a Spark application; it consists of a driver program that runs the user’s main function and continuous executes various parallel operations on input streams of data. The main abstraction Spark Streaming provides is a discretized stream (DStream), which is a continuous sequence of RDDs (distributed collections of elements) representing a continuous stream of data. DStreams can be created from live incoming data (such as data from a socket, Kafka, etc.) or can be generated by transformong existing DStreams using parallel operators like map, reduce, and window.
複製程式碼


How to Use Spark Streaming
作為構建於Spark之上的應用框架,Spark Streaming承襲了Spark的程式設計風格,對於瞭解Spark的使用者來說能夠快速地上手。接下來以word count為例來介紹Spark Streaming的使用方式:
  1. import spark.streaming.{Seconds, StreamingContext}
  2. import spark.streaming.StreamingContext._
  3. ...
  4. // Create the context and set up a network input stream to receive from a host:port
  5. val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
  6. val lines = ssc.socketTextStream(args(1), args(2).toInt)
  7. // Split the lines into words, count them, and print some of the counts on the master
  8. val words = lines.flatMap(_.split(" "))
  9. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  10. wordCounts.print()
  11. // Start the computation
  12. ssc.start()
複製程式碼

1、建立StreamingContext物件

同Spark初始需要建立SparkContext物件一樣,使用Spark Streaming就需要建立StreamingContext物件。建立StreamingContext物件所需的引數與SparkContext基本一致,包括指明master,設定名稱(如NetworkWordCount)。需要注意的是引數Seconds(1),Spark Streaming需要指定處理資料的時間間隔,如上例所示的1s,那麼Spark Streaming會以1s為時間視窗進行資料處理。此引數需要根據使用者的需求和叢集的處理能力進行適當的設定。

2、建立InputDStream

如同Storm的Spout,Spark Streaming需要指明資料來源。如上例所示的socketTextStream,Spark Streaming以socket連線作為資料來源讀取資料。當然Spark Streaming支援多種不同的資料來源,包括kafkaStream,flumeStream,fileStream, networkStream等。

3、操作DStream

對於從資料來源得到的DStream,使用者可以在其基礎上進行各種操作,如上例所示的操作就是一個典型的word count執行流程:對於當前時間視窗內從資料來源得到的資料首先進行分割,然後利用MapReduce演算法對映和計算,當然最後還有print()輸出結果。

4、啟動Spark Streaming

之前所作的所有步驟只是建立了執行流程,程式沒有真正連線上資料來源,也沒有對資料進行任何操作,只是設定好了所有的執行計劃,當ssc.start()啟動後程序才真正進行所有預期的操作。

至此對於Spark Streaming的如何使用有了一個大概的印象,接下來我們來探究一下Spark Streaming背後的程式碼。

Spark Streaming 原始碼分析
StreamingContext
Spark Streaming使用StreamingContext提供對外介面,使用者可以使用StreamingContext提供的api來構建自己的Spark Streaming應用程式。

StreamingContext內部維護SparkContext例項,通過SparkContext進行RDD的操作。
在例項化StreamingContext時需要指定batchDuration,用來指示Spark Streaming recurring job的重複時間。
StreamingContext提供了多種不同的介面,可以從多種資料來源建立DStream。
StreamingContext提供了起停streaming job的api。

DStream
Spark Streaming是建立在Spark基礎上的,它封裝了Spark的RDD並在其上抽象了流式的資料表現形式DStream:
  1. A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data. DStreams can either be created from live data (such as, data from HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations such as map, window and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream
複製程式碼  



DStream內部主要結構如下所示:
  1. abstract class DStream[T: ClassManifest] (
  2.     @transient protected[streaming] var ssc: StreamingContext
  3.         ) extends Serializable with Logging {
  4.   initLogging()
  5.   // =======================================================================
  6.   // Methods that should be implemented by subclasses of DStream
  7.   // =======================================================================
  8.   /** Time interval after which the DStream generates a RDD */
  9.   def slideDuration: Duration
  10.   /** List of parent DStreams on which this DStream depends on */
  11.   def dependencies: List[DStream[_]]
  12.   /** Method that generates a RDD for the given time */
  13.   /** DStream的核心函式,每一個繼承於此的子類都需要實現此compute()函式。而根據不同的
  14.       DStream, compute()函式都需要實現其特定功能,而計算的結果則是返回計算好的RDD*/
  15.   def compute (validTime: Time): Option[RDD[T]]
  16.   // =======================================================================
  17.   // Methods and fields available on all DStreams
  18.   // =======================================================================
  19.   // RDDs generated, marked as protected[streaming] so that testsuites can access it
  20.   /** 每一個DStream內部維護的RDD HashMap,DStream本質上封裝了一組以Time為key的RDD,而對於
  21.       DStream的各種操作在內部對映為對RDD的操作 */
  22.   @transient
  23.   protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
  24.   // Time zero for the DStream
  25.   protected[streaming] var zeroTime: Time = null
  26.   // Duration for which the DStream will remember each RDD created
  27.   protected[streaming] var rememberDuration: Duration = null
  28.   // Storage level of the RDDs in the stream
  29.   protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
  30.   // Checkpoint details
  31.   protected[streaming] val mustCheckpoint = false
  32.   protected[streaming] var checkpointDuration: Duration = null
  33.   protected[streaming] val checkpointData = new DStreamCheckpointData(this)
  34.   // Reference to whole DStream graph
  35.   /** 所有的DStream都註冊到DStreamGraph中,呼叫DStreamGraph來執行所有的DStream和所有的dependencies */
  36.   protected[streaming] var graph: DStreamGraph = null
  37.   protected[streaming] def isInitialized = (zeroTime != null)
  38.   // Duration for which the DStream requires its parent DStream to remember each RDD created
  39.   protected[streaming] def parentRememberDuration = rememberDuration
  40.   ...
複製程式碼


DStream在內部維護了一組時間序列的RDD,對於DStream的transformation和output在內部都轉化為對於RDD的transformation和output。

下面來看一下對於DStream的計算是如何對映到對於RDD的計算上去的。
  1. protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
  2.   // If this DStream was not initialized (i.e., zeroTime not set), then do it
  3.   // If RDD was already generated, then retrieve it from HashMap
  4.   generatedRDDs.get(time) match {
  5.     // If an RDD was already generated and is being reused, then
  6.     // probably all RDDs in this DStream will be reused and hence should be cached
  7.     case Some(oldRDD) => Some(oldRDD)
  8.     // if RDD was not generated, and if the time is valid
  9.     // (based on sliding time of this DStream), then generate the RDD
  10.     case None => {
  11.       if (isTimeValid(time)) {
  12.         /** 對於每一次的計算,DStream會呼叫子類所實現的compute()函式來計算產生新的RDD */
  13.         compute(time) match {
  14.           case Some(newRDD) =>
  15.             if (storageLevel != StorageLevel.NONE) {
  16.               newRDD.persist(storageLevel)
  17.               logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
  18.             }
  19.             if (checkpointDuration != null && (time - zeroTime).isMultipleOf (checkpointDuration)) {
  20.               newRDD.checkpoint()
  21.               logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
  22.             }
  23.                         /** 新產生的RDD會放入Hash Map中 */
  24.             generatedRDDs.put(time, newRDD)
  25.             Some(newRDD)
  26.           case None =>
  27.             None
  28.         }
  29.       } else {
  30.         None
  31.       }
  32.     }
  33.   }
  34. }
複製程式碼


通過每次提交的job,呼叫getOrCompute()來計算:
  1. protected[streaming] def generateJob(time: Time): Option[Job] = {
  2. getOrCompute(time) match {
  3.     case Some(rdd) => {
  4.       val jobFunc = () => {
  5.         val emptyFunc = { (iterator: Iterator[T]) => {} }
  6.         context.sparkContext.runJob(rdd, emptyFunc)
  7.       }
  8.       Some(new Job(time, jobFunc))
  9.     }
  10.     case None => None
  11.   }
  12. }
複製程式碼


Job & Scheduler
從DStream可知,在呼叫generateJob()時,DStream會通過getOrCompute()函式來計算或是轉換DStream,那麼Spark Streaming會在何時呼叫generateJob()呢?

在例項化StreamingContext時,StreamingContext會要求使用者設定batchDuration,而batchDuration則指明瞭recurring job的重複時間,在每個batchDuration到來時都會產生一個新的job來計算DStream,從Scheduler的程式碼裡可以看到:
  1. val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
  2. val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
  3. /** Spark streaming在Scheduler內部建立了recurring timer,recurring timer的超時時間
  4.     則是使用者設定的batchDuration,在超時後呼叫Scheduler的generateJob */
  5. val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  6. longTime => generateJobs(new Time(longTime)))
  7. generateJobs()的程式碼如下所示,Scheduler的generateJobs()會呼叫DStreamGraph的generateJobs,並對於每一個job使用JobManager來run job。
  8. def generateJobs(time: Time) {
  9.   SparkEnv.set(ssc.env)
  10.   logInfo("\n-----------------------------------------------------\n")
  11.   graph.generateJobs(time).foreach(jobManager.runJob)
  12.   latestTime = time
  13.   doCheckpoint(time)
  14. }
複製程式碼


在DStreamGraph中,generateJobs()如下所示:
  1. def generateJobs(time: Time): Seq[Job] = {
  2.   this.synchronized {
  3.     logInfo("Generating jobs for time " + time)
  4.     val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
  5.     logInfo("Generated " + jobs.length + " jobs for time " + time)
  6.     jobs
  7.   }
  8. }
複製程式碼


對於每一個outputStream呼叫generateJob()來轉換或計算DStream,output的計算會依賴於dependecy的計算,因此最後會對所有dependency都進行計算,得出最後的outputStream。

而所有的這些操作,都在呼叫StreamingContext的啟動函式後進行執行。
  1. def start() {
  2.   if (checkpointDir != null && checkpointDuration == null && graph != null) {
  3.     checkpointDuration = graph.batchDuration
  4.   }
  5.   validate()
  6.   /** StreamingContext註冊和啟動所有的input stream */
  7.   val networkInputStreams = graph.getInputStreams().filter(s => s match {
  8.       case n: NetworkInputDStream[_] => true
  9.       case _ => false
  10.     }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
  11.   if (networkInputStreams.length > 0) {
  12.     // Start the network input tracker (must start before receivers)
  13.     networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
  14.     networkInputTracker.start()
  15.   }
  16.   Thread.sleep(1000)
  17.   // 啟動scheduler進行streaming的操作
  18.   scheduler = new Scheduler(this)
  19.   scheduler.start()
  20. }
複製程式碼


至此,對於Spark Streaming的使用和內部結構應該有了一個基本的瞭解,以一副Spark Streaming啟動後的流程圖來結束這篇文章。