資料湖應用解析:Spark on Elasticsearch一致性問題
摘要:髒資料對資料計算的正確性帶來了很嚴重的影響。因此,我們需要探索一種方法,能夠實現Spark寫入Elasticsearch資料的可靠性與正確性。
概述
Spark與Elasticsearch(es)的結合,是近年來大資料解決方案很火熱的一個話題。一個是出色的分散式計算引擎,另一個是出色的搜尋引擎。近年來,越來越多的成熟方案落地到行業產品中,包括我們耳熟能詳的Spark+ES+HBase日誌分析平臺。
目前,華為雲資料湖探索(DLI)服務已全面支援Spark/Flink跨源訪問Elasticsearch。而之前在實現過程中也遇到過很多場景化問題,本文將挑選其中比較經典的分散式一致性問題進行探討。
分散式一致性問題
問題描述
資料容錯是大資料計算引擎面臨的主要問題之一。目前,主流的開源大資料比如Apache Spark和Apache Flink已經完全實現了Exactly Once語義,保證了內部資料處理的正確性。但是在將計算結果寫入到外部資料來源時,因為外部資料來源架構與訪問方式的多樣性,始終沒能找到一個統一的解決方案來保證一致性(我們稱為Sink運算元一致性問題)。再加上es本身沒有事務處理的能力,因此如何保證寫入es資料一致性成為了熱點話題。
我們舉一個簡單的例子來說明一下,圖1在SparkRDD中(這裡假設是一個task),每一條藍色的線代表100萬條資料,那麼10條藍色的線表示了有1000萬條資料準備寫入到CSS(華為雲搜尋服務,內部為es)的某個index中。在寫入過程中,系統發生了故障,導致只有一半(500萬條)資料成功寫入。
task是Spark執行任務的最小單元,如果task失敗了,當前task需要整個重新執行。所以,當我們重新執行寫入操作(圖2),並最終重試成功之後(這次用紅色來表示相同的1000萬條資料),上一次失敗留下的500萬條資料依然存在(藍色的線),變成髒資料。髒資料對資料計算的正確性帶來了很嚴重的影響。因此,我們需要探索一種方法,能夠實現Spark寫入es資料的可靠性與正確性。
圖1 Spark task失敗時向es寫入了部分資料
圖2 task重試成功後上一次寫入的部分資料成為髒資料
解決方案
1.寫覆蓋
從上圖中,我們可以很直觀的看出來,每次task插入資料前,先將es的index中的資料都清空就可以了。那麼,每次寫入操作可以看成是以下3個步驟的組合:
- 步驟一 判斷當前index中是否有資料
- 步驟二 清空當前index中的資料
- 步驟三 向index中寫入資料
換一種角度,我們可以理解為,不管之前是否執行了資料寫入,也不管之前資料寫入了多少次,我們只想要保證當前這一次寫入能夠獨立且正確地完成,這種思想我們稱為冪等。
冪等式寫入是大資料sink運算元解決一致性問題的一種常見思路,另一種說法叫做最終一致性,其中最簡單的做法就是“insert overwrite”。當Spark資料寫入es失敗並嘗試重新執行的時候,利用覆蓋式寫入,可以將index中的殘留資料覆蓋掉。
圖 使用overwrite模式,task重試時覆蓋上一次資料
在DLI中,可以在DataFrame接口裡將mode設定成“overwrite”來實現覆蓋寫es:
val dfWriter = sparkSession.createDataFrame(rdd, schema) // // 寫入資料至es // dfWriter.write .format("es") .option("es.resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Overwrite) .save()
也可以直接使用sql語句:
// 插入資料至es sparkSession.sql("insert overwrite table es_table values(1, 'John'),(2, 'Bob')")
2.最終一致性
利用上述“overwrite”的方式解決容錯問題有一個很大的缺陷。如果es已經存在了正確的資料,這次只是需要追加寫入。那麼overwrite會把之前index的正確的資料都覆蓋掉。
比如說,有多個task併發執行寫入資料的操作,其中一個task執行失敗而其他task執行成功,重新執行失敗的task進行“overwrite”會將其他已經成功寫入的資料覆蓋掉。再比如說,Streaming場景中,每一批次資料寫入都變成覆蓋,這是不合理的方式。
圖 Spark追加資料寫入es
圖 用overwrite寫入會將原先正確的資料覆蓋掉
其實,我們想做的事情,只是清理髒資料而不是所有index中的資料。因此,核心問題變成了如何識別髒資料?借鑑其他資料庫解決方案,我們似乎可以找到方法。在MySQL中,有一個insert ignore into的語法,如果遇到主鍵衝突,能夠單單對這一行資料進行忽略操作,而如果沒有衝突,則進行普通的插入操作。這樣就可以將覆蓋資料的力度細化到了行級別。
es中有類似的功能麼?假如es中每一條資料都有主鍵,主鍵衝突時可以進行覆蓋(忽略和覆蓋其實都能解決這個問題),那麼在task失敗重試時,就可以僅針對髒資料進行覆蓋。
我們先來看一下Elasticsearch中的概念與關係型資料庫之間的一種對照關係:
我們知道,MySQL中的主鍵是對於一行資料(Row)的唯一標識。從表中可以看到,Row對應的就是es中的Document。那麼,Document有沒有唯一的標識呢?
答案是肯定的,每一個Document都有一個id,即doc_id。doc_id是可配置的,index、type、doc_id三者指定了唯一的一條資料(Document)。並且,在插入es時,index、type、doc_id相同,原先的document資料將會被覆蓋掉。因此,doc_id可以等效於“MySQL主鍵衝突忽略插入”功能,即“doc_id衝突覆蓋插入”功能。
因此,DLI的SQL語法中提供了配置項“es.mapping.id”,可以指定一個欄位作為Document id,例如:
create table es_table(id int, name string) using es options( 'es.nodes' 'localhost:9200', 'es.resource' '/mytest/anytype', 'es.mapping.id' 'id')")
這裡指定了欄位“id”作為es的doc_id,當插入資料時,欄位“id”的值將成為插入Document的id。值得注意的是,“id”的值要唯一,否則相同的“id”將會使資料被覆蓋。
這時,如果遇到作業或者task失敗的情況,直接重新執行即可。當最終作業執行成功時,es中將不會出現殘留的髒資料,即實現了最終一致性。
圖 在插入資料時將主鍵設為doc_id,利用冪等插入來實現最終一致性
總結
本文可以一句話總結為“利用doc_id實現寫入es的最終一致性”。而這種問題,實際上不需要如此大費周章的探索,因為在es的原生API中,插入資料是需要指定doc_id,這應該是一個基本常識:詳細API說明可以參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)
圖 es使用bulk介面進行資料寫入
權當消遣,聊以慰藉。
得益於Base理論,最終一致性成為分散式計算中重要的解決方案之一。儘管該解決方案還有一定的限制(比如本文的解決方案中資料必須使用主鍵),而業界還有很多分散式一致性的解決方案(比如2PC、3PC)。但個人認為,衡量工作量與最終效果,最終一致性是一種很有效且很簡約的解決方案。
擴充套件閱讀:Elasticsearch Datasource
簡介
Datasource是Apache Spark提供的訪問外部資料來源的統一介面。Spark提供了SPI機制對Datasource進行了外掛式管理,可以通過Spark的Datasource模組自定義訪問Elasticsearch的邏輯。
華為雲DLI(資料湖探索)服務已完全實現了es datasource功能,使用者只要通過簡單的SQL語句或者Spark DataFrame API就能實現Spark訪問es。
功能描述
通過Spark訪問es,可以在DLI官方文件中找到詳細資料:https://support.huaweicloud.com/usermanual-dli/dli_01_0410.html。(Elasticsearch是由華為雲CSS雲搜尋服務提供)。
可以使用Spark DataFrame API方式來進行資料的讀寫:
// // 初始化設定 // // 設定es的/index/type(es 6.x版本不支援同一個index中存在多個type,7.x版本不支援設定type) val resource = "/mytest/anytype"; // 設定es的連線地址(格式為”node1:port,node2:port...”,因為es的replica機制,即使訪問es叢集,只需要配置一個地址即可.) val nodes = "localhost:9200" // 構造資料 val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false))) val rdd = sparkSession.sparkContext.parallelize(Seq(Row(1, "John"),Row(2,"Bob"))) val dfWriter = sparkSession.createDataFrame(rdd, schema) // // 寫入資料至es // dfWriter.write .format("es") .option("es.resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Append) .save() // // 從es讀取資料 // val dfReader = sparkSession.read.format("es").option("es.resource",resource).option("es.nodes", nodes).load() dfReader.show()
也可以使用Spark SQL來訪問:
// 建立一張關聯es /index/type的Spark臨時表,該表並不存放實際資料 val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql("create table es_table(id int, name string) using es options( 'es.nodes' 'localhost:9200', 'es.resource' '/mytest/anytype')") // 插入資料至es sparkSession.sql("insert into es_table values(1, 'John'),(2, 'Bob')") // 從es中讀取資料 val dataFrame = sparkSession.sql("select * from es_table") dataFrame.show()
點選關注,第一時間瞭解華為雲新鮮技