Spark Streaming 中如何實現 Exactly-Once 語義
Exactly-once 語義是實時計算的難點之一。要做到每一條記錄只會被處理一次,即使伺服器或網路發生故障時也能保證沒有遺漏,這不僅需要實時計算框架本身的支援,還對上游的訊息系統、下游的資料儲存有所要求。此外,我們在編寫計算流程時也需要遵循一定規範,才能真正實現 Exactly-once。本文將講述如何結合 Spark Streaming 框架、Kafka 訊息系統、以及 MySQL 資料庫來實現 Exactly-once 的實時計算流程。
引例
首先讓我們實現一個簡單而完整的實時計算流程。我們從 Kafka 接收使用者訪問日誌,解析並提取其中的時間和日誌級別,並統計每分鐘錯誤日誌的數量,結果儲存到 MySQL 中。
示例日誌:
2017-07-30 14:09:08 ERROR some message
2017-07-30 14:09:20 INFO some message
2017-07-30 14:10:50 ERROR some message
結果表結構,其中 log_time
欄位會擷取到分鐘級別:
create table error_log (
log_time datetime primary key,
log_count int not null default 0
);
Scala 專案通常使用 sbt
來管理。我們將下列依賴新增到 build.sbt
檔案中。本例使用的是 Spark 2.2 和 Kafka 0.10,資料庫操作類庫使用了 ScalikeJDBC 3.0。
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "2.2.0" % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0",
"org.scalikejdbc" %% "scalikejdbc" % "3.0.1",
"mysql" % "mysql-connector-java" % "5.1.43"
)
完整的示例程式碼已上傳至 GitHub(
// 初始化資料庫連線
ConnectionPool.singleton("jdbc:mysql://localhost:3306/spark", "root", "")
// 建立 Spark Streaming 上下文
val conf = new SparkConf().setAppName("ExactlyOnce").setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// 使用 Kafka Direct API 建立 DStream
val messages = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Seq("alog"), kafkaParams))
messages.foreachRDD { rdd =>
// 日誌處理
val result = rdd.map(_.value)
.flatMap(parseLog) // 日誌解析函式
.filter(_.level == "ERROR")
.map(log => log.time.truncatedTo(ChronoUnit.MINUTES) -> 1)
.reduceByKey(_ + _)
.collect()
// 結果儲存至資料庫
DB.autoCommit { implicit session =>
result.foreach { case (time, count) =>
sql"""
insert into error_log (log_time, log_count)
value (${time}, ${count})
on duplicate key update log_count = log_count + values(log_count)
""".update.apply()
}
}
}
實時計算語義
實時計算有三種語義,分別是 At-most-once、At-least-once、以及 Exactly-once。一個典型的 Spark Streaming 應用程式會包含三個處理階段:接收資料、處理彙總、輸出結果。每個階段都需要做不同的處理才能實現相應的語義。
對於 接收資料,主要取決於上游資料來源的特性。例如,從 HDFS 這類支援容錯的檔案系統中讀取檔案,能夠直接支援 Exactly-once 語義。如果上游訊息系統支援 ACK(如RabbitMQ),我們就可以結合 Spark 的 Write Ahead Log 特性來實現 At-least-once 語義。對於非可靠的資料接收器(如 socketTextStream
),當 Worker 或 Driver 節點發生故障時就會產生資料丟失,提供的語義也是未知的。而 Kafka 訊息系統是基於偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 語義。
在使用 Spark RDD 對資料進行 轉換或彙總 時,我們可以天然獲得 Exactly-once 語義,因為 RDD 本身就是一種具備容錯性、不變性、以及計算確定性的資料結構。只要資料來源是可用的,且處理過程中沒有副作用(Side effect),我們就能一直得到相同的計算結果。
結果輸出 預設符合 At-least-once 語義,因為 foreachRDD
方法可能會因為 Worker 節點失效而執行多次,從而重複寫入外部儲存。我們有兩種方式解決這一問題,冪等更新和事務更新。下面我們將深入探討這兩種方式。
使用冪等寫入實現 Exactly-once
如果多次寫入會產生相同的結果資料,我們可以認為這類寫入操作是冪等的。saveAsTextFile
就是一種典型的冪等寫入。如果訊息中包含唯一主鍵,那麼多次寫入相同的資料也不會在資料庫中產生重複記錄。這種方式也就能等價於 Exactly-once 語義了。但需要注意的是,冪等寫入只適用於 Map-only 型的計算流程,即沒有 Shuffle、Reduce、Repartition 等操作。此外,我們還需對 Kafka DStream 做一些額外設定:
- 將
enable.auto.commit
設定為false
。預設情況下,Kafka DStream 會在接收到資料後立刻更新自己的偏移量,我們需要將這個動作推遲到計算完成之後。 - 開啟 Spark Streaming 的 Checkpoint 特性,用於存放 Kafka 偏移量。但若應用程式程式碼發生變化,Checkpoint 資料也將無法使用,這就需要改用下面的操作:
- 在資料輸出之後手動提交 Kafka 偏移量。
HasOffsetRanges
類,以及commitAsync
API 可以做到這一點:
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
// output to database
}
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
使用事務寫入實現 Exactly-once
在使用事務型寫入時,我們需要生成一個唯一 ID,這個 ID 可以使用當前批次的時間、分割槽號、或是 Kafka 偏移量來生成。之後,我們需要在一個事務中將處理結果和這個唯一 ID 一同寫入資料庫。這一原子性的操作將帶給我們 Exactly-once 語義,而且該方法可以同時適用於 Map-only 以及包含匯聚操作的計算流程。
我們通常會在 foreachPartition
方法中來執行資料庫寫入操作。對於 Map-only 流程來說是適用的,因為這種流程下 Kafka 分割槽和 RDD 分割槽是一一對應的,我們可以用以下方式獲取各分割槽的偏移量:
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val offsetRange = offsetRanges(TaskContext.get.partitionId)
}
}
但對於包含 Shuffle 的計算流程(如上文的錯誤日誌統計),我們需要先將處理結果拉取到 Driver 程序中,然後才能執行事務操作:
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val result = processLogs(rdd).collect() // parse log and count error
DB.localTx { implicit session =>
result.foreach { case (time, count) =>
// save to error_log table
}
offsetRanges.foreach { offsetRange =>
val affectedRows = sql"""
update kafka_offset set offset = ${offsetRange.untilOffset}
where topic = ${topic} and `partition` = ${offsetRange.partition}
and offset = ${offsetRange.fromOffset}
""".update.apply()
if (affectedRows != 1) {
throw new Exception("fail to update offset")
}
}
}
}
如果偏移量寫入失敗,或者重複處理了某一部分資料(offset != $fromOffset
判斷條件不通過),該事務就會回滾,從而做到 Exactly-once。
總結
實時計算中的 Exactly-once 是比較強的一種語義,因而會給你的應用程式引入額外的開銷。此外,它尚不能很好地支援視窗型操作。因此,是否要在程式碼中使用這一語義就需要開發者自行判斷了。很多情況下,資料丟失或重複處理並不那麼重要。不過,瞭解 Exactly-once 的開發流程還是有必要的,對學習 Spark Streaming 也會有所助益。