1. 程式人生 > 實用技巧 >java8中parallelStream提升數倍查詢效率是怎樣實現的,來看看這篇文章

java8中parallelStream提升數倍查詢效率是怎樣實現的,來看看這篇文章

摘要:髒資料對資料計算的正確性帶來了很嚴重的影響。因此,我們需要探索一種方法,能夠實現Spark寫入Elasticsearch資料的可靠性與正確性。

概述

Spark與Elasticsearch(es)的結合,是近年來大資料解決方案很火熱的一個話題。一個是出色的分散式計算引擎,另一個是出色的搜尋引擎。近年來,越來越多的成熟方案落地到行業產品中,包括我們耳熟能詳的Spark+ES+HBase日誌分析平臺。

目前,華為雲資料湖探索(DLI)服務已全面支援Spark/Flink跨源訪問Elasticsearch。而之前在實現過程中也遇到過很多場景化問題,本文將挑選其中比較經典的分散式一致性問題進行探討。

分散式一致性問題

問題描述

資料容錯是大資料計算引擎面臨的主要問題之一。目前,主流的開源大資料比如Apache Spark和Apache Flink已經完全實現了Exactly Once語義,保證了內部資料處理的正確性。但是在將計算結果寫入到外部資料來源時,因為外部資料來源架構與訪問方式的多樣性,始終沒能找到一個統一的解決方案來保證一致性(我們稱為Sink運算元一致性問題)。再加上es本身沒有事務處理的能力,因此如何保證寫入es資料一致性成為了熱點話題。

我們舉一個簡單的例子來說明一下,圖1在SparkRDD中(這裡假設是一個task),每一條藍色的線代表100萬條資料,那麼10條藍色的線表示了有1000萬條資料準備寫入到CSS(華為雲搜尋服務,內部為es)的某個index中。在寫入過程中,系統發生了故障,導致只有一半(500萬條)資料成功寫入。

task是Spark執行任務的最小單元,如果task失敗了,當前task需要整個重新執行。所以,當我們重新執行寫入操作(圖2),並最終重試成功之後(這次用紅色來表示相同的1000萬條資料),上一次失敗留下的500萬條資料依然存在(藍色的線),變成髒資料。髒資料對資料計算的正確性帶來了很嚴重的影響。因此,我們需要探索一種方法,能夠實現Spark寫入es資料的可靠性與正確性。

圖1 Spark task失敗時向es寫入了部分資料

圖2 task重試成功後上一次寫入的部分資料成為髒資料

解決方案

1.寫覆蓋

從上圖中,我們可以很直觀的看出來,每次task插入資料前,先將es的index中的資料都清空就可以了。那麼,每次寫入操作可以看成是以下3個步驟的組合:

  • 步驟一 判斷當前index中是否有資料
  • 步驟二 清空當前index中的資料
  • 步驟三 向index中寫入資料

換一種角度,我們可以理解為,不管之前是否執行了資料寫入,也不管之前資料寫入了多少次,我們只想要保證當前這一次寫入能夠獨立且正確地完成,這種思想我們稱為冪等。

冪等式寫入是大資料sink運算元解決一致性問題的一種常見思路,另一種說法叫做最終一致性,其中最簡單的做法就是“insert overwrite”。當Spark資料寫入es失敗並嘗試重新執行的時候,利用覆蓋式寫入,可以將index中的殘留資料覆蓋掉。

圖 使用overwrite模式,task重試時覆蓋上一次資料

在DLI中,可以在DataFrame接口裡將mode設定成“overwrite”來實現覆蓋寫es:

val dfWriter = sparkSession.createDataFrame(rdd, schema)

//
// 寫入資料至es
//
dfWriter.write 
  .format("es") 
  .option("es.resource", resource) 
  .option("es.nodes", nodes) 
  .mode(SaveMode.Overwrite) 
  .save()

也可以直接使用sql語句:

// 插入資料至es
sparkSession.sql("insert overwrite table es_table values(1, 'John'),(2, 'Bob')")

2.最終一致性

利用上述“overwrite”的方式解決容錯問題有一個很大的缺陷。如果es已經存在了正確的資料,這次只是需要追加寫入。那麼overwrite會把之前index的正確的資料都覆蓋掉。

比如說,有多個task併發執行寫入資料的操作,其中一個task執行失敗而其他task執行成功,重新執行失敗的task進行“overwrite”會將其他已經成功寫入的資料覆蓋掉。再比如說,Streaming場景中,每一批次資料寫入都變成覆蓋,這是不合理的方式。

圖 Spark追加資料寫入es

圖 用overwrite寫入會將原先正確的資料覆蓋掉

其實,我們想做的事情,只是清理髒資料而不是所有index中的資料。因此,核心問題變成了如何識別髒資料?借鑑其他資料庫解決方案,我們似乎可以找到方法。在MySQL中,有一個insert ignore into的語法,如果遇到主鍵衝突,能夠單單對這一行資料進行忽略操作,而如果沒有衝突,則進行普通的插入操作。這樣就可以將覆蓋資料的力度細化到了行級別。

es中有類似的功能麼?假如es中每一條資料都有主鍵,主鍵衝突時可以進行覆蓋(忽略和覆蓋其實都能解決這個問題),那麼在task失敗重試時,就可以僅針對髒資料進行覆蓋。

我們先來看一下Elasticsearch中的概念與關係型資料庫之間的一種對照關係:

我們知道,MySQL中的主鍵是對於一行資料(Row)的唯一標識。從表中可以看到,Row對應的就是es中的Document。那麼,Document有沒有唯一的標識呢?

答案是肯定的,每一個Document都有一個id,即doc_id。doc_id是可配置的,index、type、doc_id三者指定了唯一的一條資料(Document)。並且,在插入es時,index、type、doc_id相同,原先的document資料將會被覆蓋掉。因此,doc_id可以等效於“MySQL主鍵衝突忽略插入”功能,即“doc_id衝突覆蓋插入”功能。

因此,DLI的SQL語法中提供了配置項“es.mapping.id”,可以指定一個欄位作為Document id,例如:

create table es_table(id int, name string) using es options(
'es.nodes' 'localhost:9200',
'es.resource' '/mytest/anytype',
'es.mapping.id' 'id')")

這裡指定了欄位“id”作為es的doc_id,當插入資料時,欄位“id”的值將成為插入Document的id。值得注意的是,“id”的值要唯一,否則相同的“id”將會使資料被覆蓋。

這時,如果遇到作業或者task失敗的情況,直接重新執行即可。當最終作業執行成功時,es中將不會出現殘留的髒資料,即實現了最終一致性。

圖 在插入資料時將主鍵設為doc_id,利用冪等插入來實現最終一致性

總結

本文可以一句話總結為“利用doc_id實現寫入es的最終一致性”。而這種問題,實際上不需要如此大費周章的探索,因為在es的原生API中,插入資料是需要指定doc_id,這應該是一個基本常識:詳細API說明可以參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

圖 es使用bulk介面進行資料寫入

權當消遣,聊以慰藉。

得益於Base理論,最終一致性成為分散式計算中重要的解決方案之一。儘管該解決方案還有一定的限制(比如本文的解決方案中資料必須使用主鍵),而業界還有很多分散式一致性的解決方案(比如2PC、3PC)。但個人認為,衡量工作量與最終效果,最終一致性是一種很有效且很簡約的解決方案。

擴充套件閱讀:Elasticsearch Datasource

簡介

Datasource是Apache Spark提供的訪問外部資料來源的統一介面。Spark提供了SPI機制對Datasource進行了外掛式管理,可以通過Spark的Datasource模組自定義訪問Elasticsearch的邏輯。

華為雲DLI(資料湖探索)服務已完全實現了es datasource功能,使用者只要通過簡單的SQL語句或者Spark DataFrame API就能實現Spark訪問es。

功能描述

通過Spark訪問es,可以在DLI官方文件中找到詳細資料:https://support.huaweicloud.com/usermanual-dli/dli_01_0410.html。(Elasticsearch是由華為雲CSS雲搜尋服務提供)。

可以使用Spark DataFrame API方式來進行資料的讀寫:

//
// 初始化設定
//

// 設定es的/index/type(es 6.x版本不支援同一個index中存在多個type,7.x版本不支援設定type)
val resource = "/mytest/anytype";

// 設定es的連線地址(格式為”node1:port,node2:port...”,因為es的replica機制,即使訪問es叢集,只需要配置一個地址即可.)
val nodes = "localhost:9200"

// 構造資料
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
val rdd = sparkSession.sparkContext.parallelize(Seq(Row(1, "John"),Row(2,"Bob")))
val dfWriter = sparkSession.createDataFrame(rdd, schema)

//
// 寫入資料至es
//
dfWriter.write 
  .format("es") 
  .option("es.resource", resource) 
  .option("es.nodes", nodes) 
  .mode(SaveMode.Append) 
  .save()

//
// 從es讀取資料
//
val dfReader = sparkSession.read.format("es").option("es.resource",resource).option("es.nodes", nodes).load()
dfReader.show()

也可以使用Spark SQL來訪問:

// 建立一張關聯es /index/type的Spark臨時表,該表並不存放實際資料
val sparkSession = SparkSession.builder().getOrCreate()
sparkSession.sql("create table es_table(id int, name string) using es options(
    'es.nodes' 'localhost:9200',
    'es.resource' '/mytest/anytype')")

// 插入資料至es
sparkSession.sql("insert into es_table values(1, 'John'),(2, 'Bob')")

// 從es中讀取資料
val dataFrame = sparkSession.sql("select * from es_table")
dataFrame.show()

點選關注,第一時間瞭解華為雲新鮮技術~