1. 程式人生 > >Spark Streaming 中如何實現 Exactly-Once 語義

Spark Streaming 中如何實現 Exactly-Once 語義

Exactly-once 語義是實時計算的難點之一。要做到每一條記錄只會被處理一次,即使伺服器或網路發生故障時也能保證沒有遺漏,這不僅需要實時計算框架本身的支援,還對上游的訊息系統、下游的資料儲存有所要求。此外,我們在編寫計算流程時也需要遵循一定規範,才能真正實現 Exactly-once。本文將講述如何結合 Spark Streaming 框架、Kafka 訊息系統、以及 MySQL 資料庫來實現 Exactly-once 的實時計算流程。

Spark Streaming

引例

首先讓我們實現一個簡單而完整的實時計算流程。我們從 Kafka 接收使用者訪問日誌,解析並提取其中的時間和日誌級別,並統計每分鐘錯誤日誌的數量,結果儲存到 MySQL 中。

示例日誌:

2017-07-30 14:09:08 ERROR some message
2017-07-30 14:09:20 INFO  some message
2017-07-30 14:10:50 ERROR some message

結果表結構,其中 log_time 欄位會擷取到分鐘級別:

create table error_log (
  log_time datetime primary key,
  log_count int not null default 0
);

Scala 專案通常使用 sbt 來管理。我們將下列依賴新增到 build.sbt 檔案中。本例使用的是 Spark 2.2 和 Kafka 0.10,資料庫操作類庫使用了 ScalikeJDBC 3.0。

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0",
  "org.scalikejdbc" %% "scalikejdbc" % "3.0.1",
  "mysql" % "mysql-connector-java" % "5.1.43"
)

完整的示例程式碼已上傳至 GitHub(

連結),下面我僅選取重要的部分加以說明:

// 初始化資料庫連線
ConnectionPool.singleton("jdbc:mysql://localhost:3306/spark", "root", "")

// 建立 Spark Streaming 上下文
val conf = new SparkConf().setAppName("ExactlyOnce").setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(conf, Seconds(5))

// 使用 Kafka Direct API 建立 DStream
val messages = KafkaUtils.createDirectStream[String, String](ssc,
   LocationStrategies.PreferConsistent,
   ConsumerStrategies.Subscribe[String, String](Seq("alog"), kafkaParams))

messages.foreachRDD { rdd =>
  // 日誌處理
  val result = rdd.map(_.value)
    .flatMap(parseLog) // 日誌解析函式
    .filter(_.level == "ERROR")
    .map(log => log.time.truncatedTo(ChronoUnit.MINUTES) -> 1)
    .reduceByKey(_ + _)
    .collect()

  // 結果儲存至資料庫
  DB.autoCommit { implicit session =>
    result.foreach { case (time, count) =>
      sql"""
      insert into error_log (log_time, log_count)
      value (${time}, ${count})
      on duplicate key update log_count = log_count + values(log_count)
      """.update.apply()
    }
  }
}

實時計算語義

實時計算有三種語義,分別是 At-most-once、At-least-once、以及 Exactly-once。一個典型的 Spark Streaming 應用程式會包含三個處理階段:接收資料、處理彙總、輸出結果。每個階段都需要做不同的處理才能實現相應的語義。

對於 接收資料,主要取決於上游資料來源的特性。例如,從 HDFS 這類支援容錯的檔案系統中讀取檔案,能夠直接支援 Exactly-once 語義。如果上游訊息系統支援 ACK(如RabbitMQ),我們就可以結合 Spark 的 Write Ahead Log 特性來實現 At-least-once 語義。對於非可靠的資料接收器(如 socketTextStream),當 Worker 或 Driver 節點發生故障時就會產生資料丟失,提供的語義也是未知的。而 Kafka 訊息系統是基於偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 語義。

在使用 Spark RDD 對資料進行 轉換或彙總 時,我們可以天然獲得 Exactly-once 語義,因為 RDD 本身就是一種具備容錯性、不變性、以及計算確定性的資料結構。只要資料來源是可用的,且處理過程中沒有副作用(Side effect),我們就能一直得到相同的計算結果。

結果輸出 預設符合 At-least-once 語義,因為 foreachRDD 方法可能會因為 Worker 節點失效而執行多次,從而重複寫入外部儲存。我們有兩種方式解決這一問題,冪等更新和事務更新。下面我們將深入探討這兩種方式。

使用冪等寫入實現 Exactly-once

如果多次寫入會產生相同的結果資料,我們可以認為這類寫入操作是冪等的。saveAsTextFile 就是一種典型的冪等寫入。如果訊息中包含唯一主鍵,那麼多次寫入相同的資料也不會在資料庫中產生重複記錄。這種方式也就能等價於 Exactly-once 語義了。但需要注意的是,冪等寫入只適用於 Map-only 型的計算流程,即沒有 Shuffle、Reduce、Repartition 等操作。此外,我們還需對 Kafka DStream 做一些額外設定:

  • enable.auto.commit 設定為 false。預設情況下,Kafka DStream 會在接收到資料後立刻更新自己的偏移量,我們需要將這個動作推遲到計算完成之後。
  • 開啟 Spark Streaming 的 Checkpoint 特性,用於存放 Kafka 偏移量。但若應用程式程式碼發生變化,Checkpoint 資料也將無法使用,這就需要改用下面的操作:
  • 在資料輸出之後手動提交 Kafka 偏移量。HasOffsetRanges 類,以及 commitAsync API 可以做到這一點:
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    // output to database
  }
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

使用事務寫入實現 Exactly-once

在使用事務型寫入時,我們需要生成一個唯一 ID,這個 ID 可以使用當前批次的時間、分割槽號、或是 Kafka 偏移量來生成。之後,我們需要在一個事務中將處理結果和這個唯一 ID 一同寫入資料庫。這一原子性的操作將帶給我們 Exactly-once 語義,而且該方法可以同時適用於 Map-only 以及包含匯聚操作的計算流程。

我們通常會在 foreachPartition 方法中來執行資料庫寫入操作。對於 Map-only 流程來說是適用的,因為這種流程下 Kafka 分割槽和 RDD 分割槽是一一對應的,我們可以用以下方式獲取各分割槽的偏移量:

messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val offsetRange = offsetRanges(TaskContext.get.partitionId)
  }
}

但對於包含 Shuffle 的計算流程(如上文的錯誤日誌統計),我們需要先將處理結果拉取到 Driver 程序中,然後才能執行事務操作:

messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = processLogs(rdd).collect() // parse log and count error
  DB.localTx { implicit session =>
    result.foreach { case (time, count) =>
      // save to error_log table
    }
    offsetRanges.foreach { offsetRange =>
      val affectedRows = sql"""
      update kafka_offset set offset = ${offsetRange.untilOffset}
      where topic = ${topic} and `partition` = ${offsetRange.partition}
      and offset = ${offsetRange.fromOffset}
      """.update.apply()

      if (affectedRows != 1) {
        throw new Exception("fail to update offset")
      }
    }
  }
}

如果偏移量寫入失敗,或者重複處理了某一部分資料(offset != $fromOffset 判斷條件不通過),該事務就會回滾,從而做到 Exactly-once。

總結

實時計算中的 Exactly-once 是比較強的一種語義,因而會給你的應用程式引入額外的開銷。此外,它尚不能很好地支援視窗型操作。因此,是否要在程式碼中使用這一語義就需要開發者自行判斷了。很多情況下,資料丟失或重複處理並不那麼重要。不過,瞭解 Exactly-once 的開發流程還是有必要的,對學習 Spark Streaming 也會有所助益。

參考資料