spark單元測試_[spark]一個SparkContext對應多個SparkSession
技術標籤:spark單元測試
原文連結:
https://www.waitingforcode.com/apache-spark-sql/multiple-sparksession-one-sparkcontext/readwww.waitingforcode.com本篇文章第一部分解釋了SparkSession和SparkContext兩個物件的作用。第二部分討論了為同一個SparkContext定義多個SparkSession的可能性,最後一部分嘗試給出它的一些用例。
SparkSession 和 SparkContext
為了更好地理解本文所討論的問題,定義我們將要討論的內容是很重要的。第一個討論點是SparkContext。它是位於drver端的Spark程式的入口點。它是一個到Spark叢集的物化連線,提供了建立rdd、累加器和廣播變數所需的所有抽象。我們只能使用一個有效活動的SparkContext,否則spark將丟擲一個在同一個JVM中只能有一個執行的SparkContext的錯誤(參見SPARK-2243)。不過,當我們設定spark.driver.allowMultipleContexts
"two SparkContexts created with allowMultipleContexts=true" should "work" in { val sparkConfiguration = new SparkConf().set("spark.driver.allowMultipleContexts", "true") val sparkContext1 = new SparkContext("local", "SparkContext#1", sparkConfiguration) val sparkContext2 = new SparkContext("local", "SparkContext#2", sparkConfiguration) } "two SparkContexts" should "fail the processing" in { val exception = intercept[Exception] { new SparkContext("local", "SparkContext#1") new SparkContext("local", "SparkContext#2") } exception.getMessage should startWith("Only one SparkContext may be running in this JVM (see SPARK-2243)") }
然而,在同一個JVM中擁有多個sparkcontext並不是一個好的實現。它對單元測試很有用,順便說一下,這是它在Apache Spark library的主要用途。在測試範圍之外,不能保證我們的程式在多個活動的SparkContext中正確地工作。此外,這也使得對我們的程式資料流管理更加困難。工作流並不是孤立的——一個上下文的潛在故障可能影響另一個上下文,甚至可能破壞整個JVM。它還會給driver程式做的所有事情帶來額外的硬體壓力。即使我們用toLocalIterator收集了一部分資料,但是要處理的資料總是比單獨的程序多很多倍。
SparkContext的一個缺點是隻能處理某一特定的場景。比如為了使用Hive,我們需要使用HiveContext,想處理結構化資料,就必須使用SQLContext例項,想處理實時流資料,就得用StreamingContext。然而SparSession解決了這個問題,它是所有不同管道的一個公共入口點。SparkSession的例項是用一個通用的構建器構造的,除了Hive需要呼叫enableHive()方法。
SparkSessions 共享 SparkContext
如前所述,同一個JVM擁有多個sparkcontext在技術上是可行的,但同時也被認為是一種不好的實現。Spark提供了一個工廠方法getOrCreate()來防止建立多個SparkContext:
"two SparkContext created with a factory method" should "not fail" in {
// getOrCreate is a factory method working with singletons
val sparkContext1 = SparkContext.getOrCreate(new SparkConf().setAppName("SparkContext#1").setMaster("local"))
val sparkContext2 = SparkContext.getOrCreate(new SparkConf().setAppName("SparkContext#2").setMaster("local"))
sparkContext1.parallelize(Seq(1, 2, 3))
sparkContext2.parallelize(Seq(4, 5, 6))
sparkContext1 shouldEqual sparkContext2
}
由於sparkSession的特點,有多個sparkSession是可能的。SparkSession是SparkContext的包裝器。context是由構建器隱式建立的,沒有任何額外的配置選項:
"Spark" should "create 2 SparkSessions" in {
val sparkSession1 = SparkSession.builder().appName("SparkSession#1").master("local").getOrCreate()
val sparkSession2 = sparkSession1.newSession()
sparkSession1.sparkContext shouldEqual sparkSession2.sparkContext
sparkSession1.stop()
// Since both SparkContexts are equal, stopping the one for sparkSession1 will make the context of
// sparkSession2 stopped too
sparkSession2.sparkContext.isStopped shouldBe true
// and that despite the different sessions
sparkSession1 shouldNot equal(sparkSession2)
}
但是可以看到,如果我們使用builder生成器,我們總是會得到相同的SparkSession例項:
"Spark" should "create 1 instance of SparkSession with builder"in {
val sparkSession1 = SparkSession.builder().appName("SparkSession#1").master("local").getOrCreate()
val sparkSession2 = SparkSession.builder().appName("SparkSession#2").master("local").getOrCreate()
sparkSession1 shouldEqual sparkSession2
}
多個SparkSessions的使用場景
有多個sparksession是很好的,至少我們不需要配置特定的選項。然而,為什麼我們需要多個sparkSession呢?第一個明顯的使用場景是,當我們需要使用來自不同sparksession的資料時,這些資料不能共享相同的配置。這個配置可能涉及到2個不同的Hive metastores而且它們的資料必須以某種方式混合在一起。為了便於說明,我用JSON檔案嘗試說明這個場景:
"Spark" should "launch 2 different apps for reading JSON files" in {
val commonDataFile = new File("/tmp/spark_sessions/common_data.jsonl")
val commonData =
"""
| {"id": 1, "name": "A"}
| {"id": 2, "name": "B"}
| {"id": 3, "name": "C"}
| {"id": 4, "name": "D"}
| {"id": 5, "name": "E"}
| {"id": 6, "name": "F"}
| {"id": 7, "name": "G"}
""".stripMargin
FileUtils.writeStringToFile(commonDataFile, commonData)
val dataset1File = new File("/tmp/spark_sessions/dataset_1.jsonl")
val dataset1Data =
"""
| {"value": 100, "join_key": 1}
| {"value": 300, "join_key": 3}
| {"value": 500, "join_key": 5}
| {"value": 700, "join_key": 7}
""".stripMargin
FileUtils.writeStringToFile(dataset1File, dataset1Data)
val dataset2File = new File("/tmp/spark_sessions/dataset_2.jsonl")
val dataset2Data =
"""
| {"value": 200, "join_key": 2}
| {"value": 400, "join_key": 4}
| {"value": 600, "join_key": 6}
""".stripMargin
FileUtils.writeStringToFile(dataset2File, dataset2Data)
// Executed against standalone cluster to better see that there is only 1 Spark application created
val sparkSession = SparkSession.builder().appName(s"SparkSession for 2 different sources").master("local")
.config("spark.executor.extraClassPath", sys.props("java.class.path"))
.getOrCreate()
val commonDataset = sparkSession.read.json(commonDataFile.getAbsolutePath)
commonDataset.cache()
import org.apache.spark.sql.functions._
val oddNumbersDataset = sparkSession.read.json(dataset1File.getAbsolutePath)
.join(commonDataset, col("id") === col("join_key"), "left")
val oddNumbers = oddNumbersDataset.collect()
// Without stop the SparkSession is represented under the same name in the UI and the master remains the same
// sparkSession.stop()
// But if you stop the session you won't be able to join the data from the second session with a dataset from the first session
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
val sparkSession2 = SparkSession.builder().appName(s"Another Spark session").master("local")
.config("spark.executor.extraClassPath", sys.props("java.class.path"))
.getOrCreate()
SparkSession.setDefaultSession(sparkSession2)
val pairNumbersDataset = sparkSession2.read.json(dataset2File.getAbsolutePath)
.join(commonDataset, col("id") === col("join_key"), "left")
val pairNumbers = pairNumbersDataset.collect()
sparkSession shouldNot equal(sparkSession2)
def stringifyRow(row: Row): String = {
s"${row.getAs[Int]("id")}-${row.getAs[String]("name")}-${row.getAs[Int]("value")}"
}
val oddNumbersMapped = oddNumbers.map(stringifyRow(_))
oddNumbersMapped should have size 4
oddNumbersMapped should contain allOf("1-A-100", "3-C-300", "5-E-500", "7-G-700")
val pairNumbersMapped = pairNumbers.map(stringifyRow(_))
pairNumbersMapped should have size 3
pairNumbersMapped should contain allOf("2-B-200", "4-D-400", "6-F-600")
}
另外,至少在理論上,我們可以從一個公共程式碼中啟動兩個不同的獨立Spark作業。使用編排工具似乎是一個更好的主意,因為在第二個會話失敗的情況下,您只需要重新啟動它,而不需要重新計算第一個資料集。
另一個我們可以使用多個SparkSessions的純理論例子是,當一些外部輸入定義了不共享要啟動的相同配置的作業數量:
"Spark" should "launch 3 applications in 3 different threads" in {
val logAppender = InMemoryLogAppender.createLogAppender(Seq("SparkSession#0",
"SparkSession#1", "SparkSession#2"))
val latch = new CountDownLatch(3)
(0 until 3).map(nr => new Thread(new Runnable() {
override def run(): Unit = {
// You can submit this application to a standalone cluster to see in the UI that always only 1 app name
// is picked up and despite of that, all 3 applications are executed inside
val config = new SparkConf().setMaster("local").setAppName(s"SparkSession#${nr}")
.set("spark.executor.extraClassPath", sys.props("java.class.path"))
val sparkSession = SparkSession.builder().config(config)
.getOrCreate()
import sparkSession.implicits._
val dataset = (0 to 5).map(nr => (nr, s"${nr}")).toDF("id", "label")
val rowsIds = dataset.collect().map(row => row.getAs[Int]("id")).toSeq
// Give some time to make the observations
Thread.sleep(3000)
println(s"adding ${rowsIds}")
AccumulatedRows.data.appendAll(Seq(rowsIds))
latch.countDown()
}
}).start())
latch.await(3, TimeUnit.MINUTES)
// Add a minute to prevent against race conditions
Thread.sleep(1000L)
AccumulatedRows.data should have size 3
AccumulatedRows.data.foreach(rows => rows should contain allOf(0, 1, 2, 3, 4, 5))
logAppender.getMessagesText() should have size 1
// Since it's difficult to deduce which application was submitted, we check only the beginning of the log message
val firstMessage = logAppender.getMessagesText()(0)
firstMessage should startWith("Submitted application: SparkSession#")
val Array(_, submittedSessionId) = firstMessage.split("#")
val allSessions = Seq("0", "1", "2")
val missingSessionsFromLog = allSessions.diff(Seq(submittedSessionId))
missingSessionsFromLog should have size 2
}
雖然SparkSession封裝了SparkContext,並且由於預設情況下每個JVM只能有一個context,所以上面示例中的所有SparkSession都在單個應用程式的UI中表示——通常是第一個啟動的應用程式。下圖顯示了多次執行上述程式碼片段後的UI。如你所見,執行的應用程式總是使用3個提交的名字中的一個:
因此,即使我們為每個SparkSession指定了不同的配置,例如不同的主地址,它也不會有任何效果。Spark將始終使用第一個啟動會話的配置,因此,也將使用第一個建立的SparkContext的配置。當然,我們可以通過呼叫給定SparkSession例項的stop()方法強制停止上下文。但在這種情況下,我們失去了與停止會話建立的DataFrames互動的可能性。
前面描述的行為證明了除錯這樣的應用程式是困難的,而使用編排工具獨立提交這3個作業則要容易得多。除了監視功能之外,此解決方案還受益於更簡單的資料恢復過程和更簡單的程式碼——失敗或成功作業的邏輯留給編排工具,處理程式碼可以專注於它應該如何最好地處理資料。
這篇文章解釋了SparkContext和SparkSession之間的互動。第一部分介紹了兩個負責管理rdd、廣播變數、累加器和DataFrames的類。第二部分展示瞭如何在單個JVM中擁有SparkContext和SparkSession的多個例項。儘管這不是一個好的建議,但通過一些額外的配置工作,這是可能的。第三部分試圖解釋為什麼可以使用多個sparksession。但所提出的任何一個用例都不能讓人信服。每次多個作業的外部編排的解決方案似乎都更容易除錯、監控和恢復。