1. 程式人生 > >論Spark Streaming的資料可靠性和一致性

論Spark Streaming的資料可靠性和一致性

眼下大資料領域最熱門的詞彙之一便是流計算了,其中最耀眼的專案無疑是來自Spark社群的Spark Streaming專案,其從一誕生就受到廣泛關注並迅速發展,目前已有追趕並超越Storm的架勢。

對於流計算而言,毫無疑問最核心的特點是它的低時延能力,這主要是來自對資料不落磁碟就進行計算的內部機制,但這也帶來了資料可靠性的問題,即有節點失效或者網路異常時,如何在節點間進行合適的協商來進行重傳。更進一步的,若發生計劃外的資料重傳,怎麼能保證沒有產生重複的資料,所有資料都是精確一次的(Exact Once)?如果不解決這些問題,大資料的流計算將無法滿足大多數企業級可靠性要求而流於徒有虛名。

本文將重點分析Spark Streaming是如何設計可靠性機制並實現資料一致性的。

Driver HA

由於流計算系統是長期執行、資料不斷流入的,因此其Spark守護程序(Driver)的可靠性是至關重要的,它決定了Streaming程式能否一直正確地執行下去。


圖一 Driver資料持久化

Driver實現HA的解決方案就是將元資料持久化,以便重啟後的狀態恢復。如圖一所示,Driver持久化的元資料包括:

  • Block元資料(圖一中的綠色箭頭):Receiver從網路上接收到的資料,組裝成Block後產生的Block元資料;
  • Checkpoint資料(圖一中的橙色箭頭):包括配置項、DStream操作、未完成的Batch狀態、和生成的RDD資料等;


圖二 Driver故障恢復

Driver失敗重啟後:

  • 恢復計算(圖二中的橙色箭頭):使用Checkpoint資料重啟driver,重新構造上下文並重啟接收器。
  • 恢復元資料塊(圖二中的綠色箭頭):恢復Block元資料。
  • 恢復未完成的作業(圖二中的紅色箭頭):使用恢復出來的元資料,再次產生RDD和對應的job,然後提交到Spark叢集執行。

通過如上的資料備份和恢復機制,Driver實現了故障後重啟、依然能恢復Streaming任務而不丟失資料,因此提供了系統級的資料高可靠。

可靠的上下游IO系統

流計算主要通過網路socket通訊來實現與外部IO系統的資料互動。由於網路通訊的不可靠特點,傳送端與接收端需要通過一定的協議來保證資料包的接收確認、和失敗重發機制。

不是所有的IO系統都支援重發,這至少需要實現資料流的持久化,同時還要實現高吞吐和低時延。在Spark Streaming官方支援的data source裡面,能同時滿足這些要求的只有Kafka,因此在最近的Spark Streaming release裡面,也是把Kafka當成推薦的外部資料系統。

除了把Kafka當成輸入資料來源(inbound data source)之外,通常也將其作為輸出資料來源(outbound data source)。所有的實時系統都通過Kafka這個MQ來做資料的訂閱和分發,從而實現流資料生產者和消費者的解耦。

一個典型的企業大資料中心資料流向檢視如下所示:


圖三 企業大資料中心資料流向檢視

除了從源頭保證資料可重發之外,Kafka更是流資料Exact Once語義的重要保障。Kafka提供了一套低階API,使得client可以訪問topic資料流的同時也能訪問其元資料。Spark Streaming的每個接收任務可以從指定的Kafka topic、partition和offset去獲取資料流,各個任務的資料邊界很清晰,任務失敗後可以重新去接收這部分資料而不會產生“重疊的”資料,因而保證了流資料“有且僅處理一次”。

可靠的接收器

在Spark 1.3版本之前,Spark Streaming是通過啟動專用的Receiver任務來完成從Kafka叢集的資料流拉取。

Receiver任務啟動後,會使用Kafka的高階API來建立topicMessageStreams物件,並逐條讀取資料流快取,每個batchInerval時刻到來時由JobGenerator提交生成一個spark計算任務。

由於Receiver任務存在宕機風險,因此Spark提供了一個高階的可靠接收器-ReliableKafkaReceiver型別來實現可靠的資料收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批資料持久化到磁碟後,更新topic-partition的offset資訊,再去接收下一批Kafka資料。萬一Receiver失敗,重啟後還能從WAL裡面恢復出已接收的資料,從而避免了Receiver節點宕機造成的資料丟失(以下程式碼刪除了細枝末節的邏輯):

class ReliableKafkaReceiver{
  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
  override def onStart(): Unit = {
    // Initialize the topic-partition / offset hash map.
    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    // Initialize the block generator for storing Kafka message.
    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
    messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
      topics.values.sum, "KafkaMessageHandler")
    blockGenerator.start()
    val topicMessageStreams = consumerConnector.createMessageStreams(
      topics, keyDecoder, valueDecoder)
    topicMessageStreams.values.foreach { streams =>
      streams.foreach { stream =>
        messageHandlerThreadPool.submit(new MessageHandler(stream))
      }
    }
  }

啟用WAL後雖然Receiver的資料可靠性風險降低了,但卻由於磁碟持久化帶來的開銷,系統整體吞吐率會有明顯的下降。因此,在最新發布的Spark 1.3版本里,Spark Streaming增加了使用Direct API的方式來實現Kafka資料來源的訪問。

引入了Direct API後,Spark Streaming不再啟動常駐的Receiver接收任務,而是直接分配給每個Batch及RDD最新的topic partition offset。job啟動執行後Executor使用Kafka的simple consumer API去獲取那一段offset的資料。

這樣做的好處不僅避免了Receiver宕機帶來的資料可靠性風險,同時也由於避免使用ZooKeeper做offset跟蹤,而實現了資料的精確一次性(以下程式碼刪除了細枝末節的邏輯):

class DirectKafkaInputDStream{
  protected val kc = new KafkaCluster(kafkaParams)
  protected var currentOffsets = fromOffsets
  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
  }	  

預寫日誌 Write Ahead Log

Spark 1.2開始提供了預寫日誌能力,用於Receiver資料及Driver元資料的持久化和故障恢復。WAL之所以能提供持久化能力,是因為它利用了可靠的HDFS做資料儲存。

Spark Streaming預寫日誌機制的核心API包括:

  • 管理WAL檔案的WriteAheadLogManager
  • 讀/寫WAL的WriteAheadLogWriter和WriteAheadLogReader
  • 基於WAL的RDD:WriteAheadLogBackedBlockRDD
  • 基於WAL的Partition:WriteAheadLogBackedBlockRDDPartition

以上核心API在資料接收和恢復階段的互動示意圖如圖四所示。


圖四 基於WAL的資料接收和恢復示意圖

從WriteAheadLogWriter的原始碼裡可以清楚地看到,每次寫入一塊資料buffer到HDFS後都會呼叫flush方法去強制刷入磁碟,然後才去取下一塊資料。因此receiver接收的資料是可以保證持久化到磁碟了,因而做到了較好的資料可靠性。

private[streaming] class WriteAheadLogWriter{
  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
  def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
    data.rewind() // Rewind to ensure all data in the buffer is retrieved
    val lengthToWrite = data.remaining()
    val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
    stream.writeInt(lengthToWrite)
    if (data.hasArray) {
      stream.write(data.array())
    } else {
      while (data.hasRemaining) {
        val array = new Array[Byte](data.remaining)
        data.get(array)
        stream.write(array)
      }
    }
    flush()
    nextOffset = stream.getPos()
    segment
  }

結束語

得益於Kafka這類可靠的data source、以及自身的checkpoint/WAL等機制,Spark Streaming的資料可靠性得到了很好的保證,資料能保證“至少一次”(at least once)被處理。但由於其outbound端的一致性實現還未完善,因此Exact once語義仍然不能端到端保證。Spark Streaming社群已經在跟進這個特性的實現(SPARK-4122),預計很快將合入trunk釋出。

作者簡介:葉琪,華為軟體公司Universe產品部高階架構師,專注於大資料底層分散式儲存和計算基礎設施,是華為軟體公司Hadoop發行版的主要架構師,目前興趣點在流計算與Spark。