1. 程式人生 > 其它 >spark單元測試_[spark]一個SparkContext對應多個SparkSession

spark單元測試_[spark]一個SparkContext對應多個SparkSession

技術標籤:spark單元測試

原文連結:

https://www.waitingforcode.com/apache-spark-sql/multiple-sparksession-one-sparkcontext/read​www.waitingforcode.com

本篇文章第一部分解釋了SparkSession和SparkContext兩個物件的作用。第二部分討論了為同一個SparkContext定義多個SparkSession的可能性,最後一部分嘗試給出它的一些用例。

SparkSession 和 SparkContext

為了更好地理解本文所討論的問題,定義我們將要討論的內容是很重要的。第一個討論點是SparkContext。它是位於drver端的Spark程式的入口點。它是一個到Spark叢集的物化連線,提供了建立rdd、累加器和廣播變數所需的所有抽象。我們只能使用一個有效活動的SparkContext,否則spark將丟擲一個在同一個JVM中只能有一個執行的SparkContext的錯誤(參見SPARK-2243)。不過,當我們設定spark.driver.allowMultipleContexts

等於true時,這個異常可以避免。

 "two SparkContexts created with allowMultipleContexts=true" should "work" in {
    val sparkConfiguration = new SparkConf().set("spark.driver.allowMultipleContexts", "true")
    val sparkContext1 = new SparkContext("local", "SparkContext#1", sparkConfiguration)
    val sparkContext2 = new SparkContext("local", "SparkContext#2", sparkConfiguration)
  }

  "two SparkContexts" should "fail the processing" in {
    val exception = intercept[Exception] {
      new SparkContext("local", "SparkContext#1")
      new SparkContext("local", "SparkContext#2")
    }

    exception.getMessage should startWith("Only one SparkContext may be running in this JVM (see SPARK-2243)")
  }

然而,在同一個JVM中擁有多個sparkcontext並不是一個好的實現。它對單元測試很有用,順便說一下,這是它在Apache Spark library的主要用途。在測試範圍之外,不能保證我們的程式在多個活動的SparkContext中正確地工作。此外,這也使得對我們的程式資料流管理更加困難。工作流並不是孤立的——一個上下文的潛在故障可能影響另一個上下文,甚至可能破壞整個JVM。它還會給driver程式做的所有事情帶來額外的硬體壓力。即使我們用toLocalIterator收集了一部分資料,但是要處理的資料總是比單獨的程序多很多倍。

SparkContext的一個缺點是隻能處理某一特定的場景。比如為了使用Hive,我們需要使用HiveContext,想處理結構化資料,就必須使用SQLContext例項,想處理實時流資料,就得用StreamingContext。然而SparSession解決了這個問題,它是所有不同管道的一個公共入口點。SparkSession的例項是用一個通用的構建器構造的,除了Hive需要呼叫enableHive()方法。

SparkSessions 共享 SparkContext

如前所述,同一個JVM擁有多個sparkcontext在技術上是可行的,但同時也被認為是一種不好的實現。Spark提供了一個工廠方法getOrCreate()來防止建立多個SparkContext:

  "two SparkContext created with a factory method" should "not fail" in {
    // getOrCreate is a factory method working with singletons
    val sparkContext1 = SparkContext.getOrCreate(new SparkConf().setAppName("SparkContext#1").setMaster("local"))
    val sparkContext2 = SparkContext.getOrCreate(new SparkConf().setAppName("SparkContext#2").setMaster("local"))

    sparkContext1.parallelize(Seq(1, 2, 3))
    sparkContext2.parallelize(Seq(4, 5, 6))

    sparkContext1 shouldEqual sparkContext2
  }

由於sparkSession的特點,有多個sparkSession是可能的。SparkSession是SparkContext的包裝器。context是由構建器隱式建立的,沒有任何額外的配置選項:

  "Spark" should "create 2 SparkSessions" in {
    val sparkSession1 = SparkSession.builder().appName("SparkSession#1").master("local").getOrCreate()
    val sparkSession2 = sparkSession1.newSession()

    sparkSession1.sparkContext shouldEqual sparkSession2.sparkContext
    sparkSession1.stop()
    // Since both SparkContexts are equal, stopping the one for sparkSession1 will make the context of
    // sparkSession2 stopped too
    sparkSession2.sparkContext.isStopped shouldBe true
    // and that despite the different sessions
    sparkSession1 shouldNot equal(sparkSession2)
  }

但是可以看到,如果我們使用builder生成器,我們總是會得到相同的SparkSession例項:

 "Spark" should "create 1 instance of SparkSession with builder"in {
    val sparkSession1 = SparkSession.builder().appName("SparkSession#1").master("local").getOrCreate()
    val sparkSession2 = SparkSession.builder().appName("SparkSession#2").master("local").getOrCreate()

    sparkSession1 shouldEqual sparkSession2
  }

多個SparkSessions的使用場景

有多個sparksession是很好的,至少我們不需要配置特定的選項。然而,為什麼我們需要多個sparkSession呢?第一個明顯的使用場景是,當我們需要使用來自不同sparksession的資料時,這些資料不能共享相同的配置。這個配置可能涉及到2個不同的Hive metastores而且它們的資料必須以某種方式混合在一起。為了便於說明,我用JSON檔案嘗試說明這個場景:

"Spark" should "launch 2 different apps for reading JSON files" in {
    val commonDataFile = new File("/tmp/spark_sessions/common_data.jsonl")
    val commonData =
      """
        | {"id": 1, "name": "A"}
        | {"id": 2, "name": "B"}
        | {"id": 3, "name": "C"}
        | {"id": 4, "name": "D"}
        | {"id": 5, "name": "E"}
        | {"id": 6, "name": "F"}
        | {"id": 7, "name": "G"}
      """.stripMargin
    FileUtils.writeStringToFile(commonDataFile, commonData)
    val dataset1File = new File("/tmp/spark_sessions/dataset_1.jsonl")
    val dataset1Data =
      """
        | {"value": 100, "join_key": 1}
        | {"value": 300, "join_key": 3}
        | {"value": 500, "join_key": 5}
        | {"value": 700, "join_key": 7}
      """.stripMargin
    FileUtils.writeStringToFile(dataset1File, dataset1Data)
    val dataset2File = new File("/tmp/spark_sessions/dataset_2.jsonl")
    val dataset2Data =
      """
        | {"value": 200, "join_key": 2}
        | {"value": 400, "join_key": 4}
        | {"value": 600, "join_key": 6}
      """.stripMargin
    FileUtils.writeStringToFile(dataset2File, dataset2Data)
    // Executed against standalone cluster to better see that there is only 1 Spark application created
    val sparkSession = SparkSession.builder().appName(s"SparkSession for 2 different sources").master("local")
      .config("spark.executor.extraClassPath", sys.props("java.class.path"))
      .getOrCreate()
    val commonDataset = sparkSession.read.json(commonDataFile.getAbsolutePath)
    commonDataset.cache()
    import org.apache.spark.sql.functions._
    val oddNumbersDataset = sparkSession.read.json(dataset1File.getAbsolutePath)
      .join(commonDataset, col("id") === col("join_key"), "left")
    val oddNumbers = oddNumbersDataset.collect()


    // Without stop the SparkSession is represented under the same name in the UI and the master remains the same
    // sparkSession.stop()
    // But if you stop the session you won't be able to join the data from the second session with a dataset from the first session
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()

    val sparkSession2 = SparkSession.builder().appName(s"Another Spark session").master("local")
      .config("spark.executor.extraClassPath", sys.props("java.class.path"))
      .getOrCreate()

    SparkSession.setDefaultSession(sparkSession2)
    val pairNumbersDataset = sparkSession2.read.json(dataset2File.getAbsolutePath)
      .join(commonDataset, col("id") === col("join_key"), "left")

    val pairNumbers = pairNumbersDataset.collect()

    sparkSession shouldNot equal(sparkSession2)
    def stringifyRow(row: Row): String = {
      s"${row.getAs[Int]("id")}-${row.getAs[String]("name")}-${row.getAs[Int]("value")}"
    }
    val oddNumbersMapped = oddNumbers.map(stringifyRow(_))
    oddNumbersMapped should have size 4
    oddNumbersMapped should contain allOf("1-A-100", "3-C-300", "5-E-500", "7-G-700")
    val pairNumbersMapped = pairNumbers.map(stringifyRow(_))
    pairNumbersMapped should have size 3
    pairNumbersMapped should contain allOf("2-B-200", "4-D-400", "6-F-600")
  }

另外,至少在理論上,我們可以從一個公共程式碼中啟動兩個不同的獨立Spark作業。使用編排工具似乎是一個更好的主意,因為在第二個會話失敗的情況下,您只需要重新啟動它,而不需要重新計算第一個資料集。

另一個我們可以使用多個SparkSessions的純理論例子是,當一些外部輸入定義了不共享要啟動的相同配置的作業數量:

"Spark" should "launch 3 applications in 3 different threads" in {
    val logAppender = InMemoryLogAppender.createLogAppender(Seq("SparkSession#0",
      "SparkSession#1", "SparkSession#2"))
    val latch = new CountDownLatch(3)
    (0 until 3).map(nr => new Thread(new Runnable() {
      override def run(): Unit = {
        // You can submit this application to a standalone cluster to see in the UI that always only 1 app name
        // is picked up and despite of that, all 3 applications are executed inside
        val config = new SparkConf().setMaster("local").setAppName(s"SparkSession#${nr}")
          .set("spark.executor.extraClassPath", sys.props("java.class.path"))
        val sparkSession = SparkSession.builder().config(config)
          .getOrCreate()
        import sparkSession.implicits._
        val dataset = (0 to 5).map(nr => (nr, s"${nr}")).toDF("id", "label")
        val rowsIds = dataset.collect().map(row => row.getAs[Int]("id")).toSeq
        // Give some time to make the observations
        Thread.sleep(3000)
        println(s"adding ${rowsIds}")
        AccumulatedRows.data.appendAll(Seq(rowsIds))
        latch.countDown()
      }
    }).start())
    latch.await(3, TimeUnit.MINUTES)
    // Add a minute to prevent against race conditions
    Thread.sleep(1000L)

    AccumulatedRows.data should have size 3
    AccumulatedRows.data.foreach(rows => rows should contain allOf(0, 1, 2, 3, 4, 5))
    logAppender.getMessagesText() should have size 1
    // Since it's difficult to deduce which application was submitted, we check only the beginning of the log message
    val firstMessage = logAppender.getMessagesText()(0)
    firstMessage should startWith("Submitted application: SparkSession#")
    val Array(_, submittedSessionId) = firstMessage.split("#")
    val allSessions = Seq("0", "1", "2")
    val missingSessionsFromLog = allSessions.diff(Seq(submittedSessionId))
    missingSessionsFromLog should have size 2
  }

雖然SparkSession封裝了SparkContext,並且由於預設情況下每個JVM只能有一個context,所以上面示例中的所有SparkSession都在單個應用程式的UI中表示——通常是第一個啟動的應用程式。下圖顯示了多次執行上述程式碼片段後的UI。如你所見,執行的應用程式總是使用3個提交的名字中的一個:

661360301ab090a1df2542913cc5c8f8.png

因此,即使我們為每個SparkSession指定了不同的配置,例如不同的主地址,它也不會有任何效果。Spark將始終使用第一個啟動會話的配置,因此,也將使用第一個建立的SparkContext的配置。當然,我們可以通過呼叫給定SparkSession例項的stop()方法強制停止上下文。但在這種情況下,我們失去了與停止會話建立的DataFrames互動的可能性。

前面描述的行為證明了除錯這樣的應用程式是困難的,而使用編排工具獨立提交這3個作業則要容易得多。除了監視功能之外,此解決方案還受益於更簡單的資料恢復過程和更簡單的程式碼——失敗或成功作業的邏輯留給編排工具,處理程式碼可以專注於它應該如何最好地處理資料。

這篇文章解釋了SparkContext和SparkSession之間的互動。第一部分介紹了兩個負責管理rdd、廣播變數、累加器和DataFrames的類。第二部分展示瞭如何在單個JVM中擁有SparkContext和SparkSession的多個例項。儘管這不是一個好的建議,但通過一些額外的配置工作,這是可能的。第三部分試圖解釋為什麼可以使用多個sparksession。但所提出的任何一個用例都不能讓人信服。每次多個作業的外部編排的解決方案似乎都更容易除錯、監控和恢復。