1. 程式人生 > 其它 >Spark權威指南(中文版)----第16章 開發Spark應用程式

Spark權威指南(中文版)----第16章 開發Spark應用程式

在第15章中,您瞭解了Spark如何在叢集上執行程式碼。現在,我們將向您展示開發一個獨立的Spark應用程式並將其部署到叢集上是多麼容易。我們將使用一個簡單的模板來實現這一點,該模板分享了一些關於如何構建應用程式的簡單技巧,包括設定構建工具和單元測試。這個模板可以在本書的程式碼儲存庫中找到。這個模板實際上並不是必需的,因為從頭編寫應用程式並不困難,但是它很有幫助。讓我們從第一個應用程式開始。

16.1.編寫Spark應用程式

Spark應用程式是兩種東西的組合:Spark叢集和程式碼。在這種情況下,叢集將是本地模式,應用程式將是預定義的模式。讓我們瀏覽一下每種語言中的應用程式。

16.1.1. 一個簡單的基於scala的應用程式

Scala是Spark的“原生”語言,自然是編寫應用程式的好方法。這和編寫Scala應用程式沒什麼不同。

提示Scala看起來有些嚇人,這取決於您的背景,但如果只是為了更好地理解Spark,那麼還是值得學習的。此外,你不需要學習這門語言的所有細節;從基礎開始,您會發現在Scala中很快就可以提高生產力。使用Scala還將開啟許多大門。通過一點實踐,通過Spark的程式碼庫進行程式碼級跟蹤並不困難。

您可以使用兩個基於Java虛擬機器(JVM)的構建工具sbt或Apache Maven構建應用程式。與任何構建工具一樣,它們都有各自的怪癖,但是從sbt開始可能是最簡單的。您可以在sbt網站上下載、安裝和了解sbt。您也可以從Maven各自的網站上安裝Maven。

要為Scala應用程式配置sbt build,我們需要指定一個build.sbt檔案,用於管理包資訊。在build.sbt檔案中,有幾個關鍵的地方:

  • 專案元資料(包名稱、包版本控制資訊等)

  • 在哪裡解決依賴關係

  • 庫所需的依賴項

您可以指定更多的選項;但是,它們超出了本書的範圍(您可以在web和sbt文件中找到相關資訊)。也有一些關於這個主題的書籍,可以作為一個有用的參考。下面是Scala build.sbt檔案的示例(以及我們在模板中包含的檔案)。注意,我們必須指定Scala版本和Spark版本:

現在我們已經定義了構建檔案,實際上可以開始向專案新增程式碼了。我們將使用標準的Scala專案結構,你可以在sbt參考手冊中找到(這是與Maven專案相同的目錄結構):

我們將原始碼放在Scala和Java目錄中。在本例中,我們將如下內容放入檔案中;這將初始化SparkSession,執行應用程式,然後退出:

注意,我們定義了一個包括main方法的類,當使用spark-submit將其提交到叢集執行時,可以從命令列執行這個類。

現在我們已經設定好了我們的專案並向其添加了一些程式碼,是時候構建它了。我們可以使用sbt assemble構建一個“超級JAR”或“胖JAR”,其中包含一個JAR中的所有依賴項。對於某些部署,這可能很簡單,但對於其他部署,這可能會導致複雜性(尤其是依賴衝突)。一個輕量級的方法是執行sbt package,它將把所有依賴項收集到目標資料夾中,但不會將所有依賴項打包到一個大JAR中。

執行程式

目標資料夾包含我們可以用作spark-submit引數的JAR。在構建Scala包之後,您可以使用以下程式碼在本地機器上進行spark-submit(此程式碼片段利用別名建立$SPARK_HOME變數;你可以將$SPARK_HOME替換為包含你下載的Spark版本的目錄):

16.1.2. 編寫Python應用程式

編寫PySpark應用程式實際上與編寫普通的Python應用程式或包沒有什麼不同。它特別類似於編寫命令列應用程式。Spark沒有構建概念,只有Python指令碼,因此要執行應用程式,只需對叢集執行指令碼。

為了促進程式碼重用,通常將多個Python檔案打包到Spark程式碼的egg或ZIP檔案中。要包含這些檔案,您可以使用spark-submit的——py-files引數來新增.py、.zip或.egg檔案,以便與應用程式一起分發。

當執行程式碼時,用Python建立一個等價於“Scala/Java main類”的類。將某個指令碼指定為構建SparkSession的可執行指令碼。這是我們傳遞給spark-submit的主要引數:

當您這樣做時,您將獲得一個SparkSession,您可以將它傳遞給您的應用程式。最佳實踐是在執行時傳遞這個變數,而不是在每個Python類中例項化它。

在Python中開發時,一個有用的技巧是使用pip將PySpark指定為依賴項。您可以通過執行命令pip install pyspark來實現這一點。這允許您以可能使用其他Python包的方式使用它。這也使得許多編輯器中的程式碼完成非常有用。這是Spark 2.2中全新的一個版本,因此可能需要一兩個版本才能完全投入生產,但是Python在Spark社群中非常流行,它肯定是Spark未來的基石。

執行程式

編寫完程式碼之後,就可以提交程式碼執行了。(我們正在執行與專案模板中相同的程式碼。)您只需要呼叫spark-submit與該資訊:

16.1.3. 編寫Java應用程式

編寫Java Spark應用程式與編寫Scala應用程式是一樣的。核心差異涉及到如何指定依賴項。

本例假設您正在使用Maven指定依賴項。在本例中,您將使用以下格式。在Maven中,您必須新增Spark Packages儲存庫,以便從這些位置獲取依賴項:

當然,您遵循與Scala專案版本相同的目錄結構(因為它們都符合Maven規範)。然後,我們只需遵循相關的Java示例來實際構建和執行程式碼。現在,我們可以建立一個簡單的例子,指定一個main類,讓我們執行(更多關於這個在本章末尾):

然後,我們使用mvn package對它進行打包(需要安裝Maven)。

執行程式

這個操作將與執行Scala應用程式(或者Python應用程式)完全相同。簡單地使用spark-submit:

16.2. 測試Spark應用程式

現在您已經知道編寫和執行一個Spark應用程式需要什麼,所以讓我們轉到一個不那麼令人興奮但仍然非常重要的主題:測試。測試Spark應用程式依賴於幾個關鍵原則和策略,在編寫應用程式時應該牢記這些原則和策略。

16.2.1. 策略原則

測試資料pipelines和Spark應用程式與實際編寫它們一樣重要。這是因為您希望確保它們對未來的資料、邏輯和輸出方面的更改具有彈性。在本節中,我們將首先討論您可能希望在典型的Spark應用程式中測試什麼,然後討論如何組織程式碼以便進行簡單的測試。

輸入資料的彈性

對不同型別的輸入資料保持彈性對於如何編寫資料管道非常重要。資料將會改變,因為業務需求將會改變。因此,您的Spark應用程式和管道應該至少對輸入資料中的某種程度的更改具有彈性,或者確保以一種優雅而有彈性的方式處理這些故障。在大多數情況下,這意味著要聰明地編寫測試來處理不同輸入的邊緣情況。

業務邏輯彈性和演化

管道中的業務邏輯和輸入資料都可能發生更改。更重要的是,您希望確保從原始資料中推匯出來的是您實際認為自己在推導的東西。這意味著您將需要對實際資料進行健壯的邏輯測試,以確保您實際上得到了您想要的結果。這裡需要注意的一件事是,嘗試編寫一組“Spark單元測試”,只測試Spark的功能。你可能不想這樣做;相反,您希望測試您的業務邏輯,並確保您設定的複雜業務管道實際上正在做您認為它應該做的事情。

輸出的彈性和原子性

假設您已經為輸入資料結構中的更改做好了準備,並且您的業務邏輯經過了良好的測試,現在您需要確保您的輸出結構是您所期望的。這意味著您需要優雅地處理輸出模式解析。通常情況下,資料不會被簡單地轉儲到某個位置,再也不會被讀取—您的大多數Spark管道可能正在為其他Spark管道提供資料。因為這個原因你要確保下游消費者理解的“狀態”,可能意味著它更新的頻率,以及資料是否“完整的”(例如,沒有後期資料),或者不會有任何最後一分鐘修正資料。

前面提到的所有問題都是構建資料管道時應該考慮的原則(實際上,無論是否使用Spark)。這種戰略思維對於為您想要構建的系統打下基礎非常重要。

16.2.2. 戰術

雖然戰略思維很重要,但是讓我們更詳細地討論一些可以使應用程式易於測試的策略。最高價值的方法是通過使用適當的單元測試來驗證您的業務邏輯是正確的,並確保您對不斷變化的輸入資料具有彈性,或者已經對其進行了結構化,以便將來模式演化不會失去作用。如何做到這一點,很大程度上取決於作為開發人員的您,因為這將根據您的業務領域和領域專長而有所不同。

管理SparkSessions

使用單元測試框架(如JUnit或ScalaTest)測試Spark程式碼相對容易,因為Spark具有本地模式——只需建立一個本地模式SparkSession作為測試工具的一部分來執行它。然而,要使此工作正常,您應該在管理程式碼中的Spark時儘可能多地執行依賴項注入。也就是說,只初始化SparkSession一次,並在執行時將其傳遞給相關的函式和類,以便在測試期間方便地進行替換。這使得在單元測試中使用一個虛擬的SparkSession測試每個單獨的函式更加容易。

使用哪個Spark API ?

Spark提供了多種api的選擇,從SQL到DataFrames和Datasets,每一種api都可能對應用程式的可維護性和可測試性產生不同的影響。坦白地說,正確的API取決於您的團隊及其需求:一些團隊和專案將需要不那麼嚴格的SQL和DataFrame API來提高開發速度,而其他團隊則希望使用型別安全的資料集或RDDs。

通常,我們建議對每個函式的輸入和輸出型別進行文件化和測試,而不管使用哪種API。型別安全API自動為您的函式強制執行一個最小的契約,這使得其他程式碼很容易在此基礎上進行構建。如果您的團隊更喜歡使用DataFrames或SQL,那麼請花一些時間記錄和測試每個函式返回什麼,以及它接受什麼型別的輸入,以避免以後出現意外,就像在任何動態型別的程式語言中一樣。雖然較低層的RDD API也是靜態型別的,但我們建議只在需要資料集中不存在的底層特性(比如分割槽)時才使用它,這應該不是很常見;Dataset API允許更多的效能優化,並且將來可能提供更多的效能優化。

對於應用程式使用哪種程式語言也有類似的考慮:對於每個團隊當然沒有正確的答案,但是根據您的需要,每種語言將提供不同的好處。我們一般建議使用靜態型別語言,像Scala和Java為更大的應用程式或者那些你希望能夠進入低階程式碼完全控制性能,但Python和R可能是更好的在其他情況下,示例中,如果您需要使用一些其他的庫。Spark程式碼應該很容易在每種語言的標準單元測試框架中進行測試。

連線到單元測試框架

要對程式碼進行單元測試,我們建議使用語言中的標準框架(例如JUnit或ScalaTest),並設定測試工具來為每個測試建立和清理SparkSession。不同的框架提供了不同的機制來實現這一點,例如“before”和“after”方法。我們在本章的應用程式模板中包含了一些單元測試程式碼示例。

連線到資料來源

儘可能地,您應該確保您的測試程式碼不連線到生產資料來源,這樣,如果這些資料來源發生更改,開發人員就可以輕鬆地單獨執行它。實現這一點的一個簡單方法是讓所有業務邏輯函式都以DataFrames或資料集作為輸入,而不是直接連線到各個源;畢竟,無論資料來源是什麼,後續程式碼都將以相同的方式工作。如果您在Spark中使用結構化api,實現這一點的另一種方法是命名錶:您可以簡單地註冊一些虛擬資料集(例如,從小文字檔案或記憶體物件載入)作為各種表名,然後從那裡開始。

16.3. 開發過程

使用Spark應用程式的開發過程類似於您可能已經使用過的開發工作流。首先,您可能要維護一個劃痕空間,比如互動式筆記本或其他類似的東西,然後在構建關鍵元件和演算法時,將它們移動到更持久的位置,比如庫或包。筆記本體驗是我們經常推薦的體驗之一(我們也經常用它來寫這本書),因為它在實驗中很簡單。還有一些工具,比如Databricks,允許您將筆記本作為生產應用程式執行。

在本地機器上執行時,spark-shell及其各種特定於語言的實現可能是開發應用程式的最佳方法。在大多數情況下,shell用於互動式應用程式,而Spark -submit用於Spark叢集上的生產應用程式。您可以使用shell以互動方式執行Spark,就像我們在本書開頭介紹的那樣。這是執行PySpark、Spark SQL和SparkR的模式。在bin資料夾中,當您下載Spark時,您將找到啟動這些shell的各種方法。只需執行spark-shell(對於Scala)、spark-sql、pyspark和sparkR。

在您完成應用程式並建立要執行的包或指令碼之後,spark-submit將成為您向叢集提交此作業的最好朋友。

16.4. 執行程式

執行Spark應用程式的最常見方法是通過Spark -submit。在本章前面,我們向您展示瞭如何執行spark-submit;您只需指定選項、應用程式JAR或指令碼以及相關引數:

在使用Spark -submit提交Spark作業時,始終可以指定是在客戶機模式還是在叢集模式下執行。但是,您應該幾乎總是傾向於在叢集模式下執行(或者在叢集本身的客戶機模式下),以減少執行者和驅動程式之間的延遲。

提交applciations時,在.jar中傳遞一個.py檔案,並將Python .zip、.egg或.py新增到搜尋路徑中,其中包含—py檔案。

為了便於參考,表16-1列出了所有可用的spark-submit選項,包括一些叢集管理器特有的選項。要自己列舉所有這些選項,請執行spark-submit with——help。

還有一些特定於部署的配置(參見表16-2)。

16.4.1. 應用程式執行的例子

在本章之前,我們已經介紹了一些本地模式的應用程式示例,但是值得一看的是我們如何使用前面提到的一些選項。Spark還在下載Spark時包含的examples目錄中包含幾個示例和演示應用程式。如果你糾結於如何使用某些引數,你可以先在本地機器上試試,然後使用SparkPi類作為主類:

下面的程式碼片段對Python也做了同樣的操作。您可以從Spark目錄執行它,這將允許您向獨立叢集管理器提交一個Python應用程式(都在一個指令碼中)。您還可以設定與前一個示例相同的執行器限制:

16.5. 配置應用程式

Spark包含許多不同的配置,其中一些我們在第15章中已經介紹過。根據您希望實現的目標,有許多不同的配置。本節將詳細介紹這些內容。大多數情況下,這些資訊都是供參考的,可能只值得略讀,除非您正在尋找特定的內容。大多數配置可分為以下幾類:

Spark提供了三個位置來配置

  • Spark屬性控制大多數應用程式引數,可以通過使用SparkConf物件來設定

  • Java系統屬性

  • 硬編碼的配置檔案

您可以使用幾個模板,您可以在Spark home資料夾的根目錄中找到/conf目錄。您可以將這些屬性設定為應用程式中的硬編碼變數,或者在執行時指定它們。您可以使用環境變數在每個節點上通過conf/spark-env.sh指令碼設定每臺機器的設定,例如IP地址。最後,您可以通過log4j.properties配置日誌記錄。

16.5.1. SparkConf

SparkConf管理所有應用程式配置。您可以通過import語句建立一個,如下面的示例所示。建立之後,SparkConf對於特定的Spark應用程式是不可變的:

您可以使用SparkConf配置具有Spark屬性的單個Spark應用程式。這些Spark屬性控制Spark應用程式的執行方式和叢集的配置方式。下面的示例將本地叢集配置為有兩個執行緒,並指定在Spark UI中顯示的應用程式名稱。

您可以在執行時配置它們,正如您在本章前面通過命令列引數看到的那樣。這有助於啟動一個Spark Shell,它將自動為您包含一個基本的Spark應用程式;例如:

值得注意的是,在設定基於時間期限的屬性時,應該使用以下格式:

16.5.2. 應用程式屬性

應用程式屬性是您從Spark -submit或建立Spark應用程式時設定的屬性。它們定義了基本的應用程式元資料以及一些執行特性。表16-3給出了當前應用程式屬性的列表。

通過應用程式web UI(Driver程式埠4040)的”Environment”選項卡,檢視所有的配置資訊,可以確保正確設定了這些值。只有通過spark-defaults、SparkConf或命令列顯式設定的值才會出現在選項卡中。對於所有其他配置屬性,可以假設使用預設值。

16.5.3. 執行環境屬性配置

雖然不太常見,但有時可能還需要配置應用程式的執行時環境。由於空間限制,我們不能在這裡包含整個配置集。請參考Spark文件中有關執行時環境的相關配置表(http://spark.apache.org/docs/latest/configuration.html#runtime-environment)。這些屬性允許您為Driver程式和Executor程式配置額外的類路徑和python路徑、python相關員配置以及其他日誌記錄配置屬性。

16.5.4. 執行時屬性配置

這些配置是您需要配置的最相關的配置之一,因為它們為您提供了對程式實際執行的更細粒度控制。由於空間限制,我們不能在這裡包含整個配置集。請參考Spark文件中有關執行行為的相關配置表(http://spark.apache.org/docs/latest/configuration.html#execution-behavior)。最常見的更改配置是spark.executor.cores(控制可用cores的數量)和spark.files.maxPartitionBytes(讀取檔案時的最大分割槽大小)。

16.5.5. 配置記憶體管理

有時您可能需要手動管理記憶體選項來嘗試優化應用程式。其中許多配置項與終端使用者無關,因為它們涉及許多遺留概念或在Spark 2中因自動記憶體管理而被排除的細粒度控制配置項。由於空間限制,我們不能在這裡包含整個配置集。請參閱Spark文件中有關記憶體管理的相關配置表(http://spark.apache.org/docs/latest/configuration.html#memory-management)。

16.5.6. 配置shuffle行為

我們已經強調過,由於Spark作業的通訊開銷很高,所以shuffle可能會成為瓶頸。因此,有許多低層配置用於控制shuffle行為。由於空間限制,我們不能在這裡包含整個配置集。請參考Spark文件中有關Shuffle行為的相關配置表(http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior)。

16.5.7. 環境變數

您可以通過環境變數配置某些Spark設定,這些環境變數來自安裝Spark的目錄中的conf/ Spark-env.sh指令碼。在Standalone模式和Mesos模式中,該檔案可以提供特定於機器的資訊,比如主機名。當執行本地Spark應用程式或提交指令碼時,它也會被引用。

注意,預設情況下,安裝Spark時不存在conf/ Spark-env.sh檔案,可以通過拷貝conf/spark-env.sh.template並重命名獲得。

以下變數可以在spark-env.sh中設定:

JAVA-HOME安裝Java的位置(如果它不在預設路徑上)。PYSPARK_PYTHONPython二進位制可執行檔案,用於Driver程式和Worker程式中的PySpark(如果可用,預設為python2.7;否則,python)。如果spark.pyspark.python屬性設定了,則spark.pyspark.python屬性優先順序高於此環境變數。PYSPARK_DRIVER_PYTHONPython二進位制可執行檔案,僅用於Driver程式中的PySpark(預設為PYSPARK_PYTHON)。屬性spark.pyspark.driver.python如果設定了,則它優先。SPARKR_DRIVER_R用於SparkR shell的二進位制可執行檔案(預設為R)。屬性spark.r.shell.command在設定時優先。SPARK_LOCAL_IP要繫結到的機器IP地址。SPARK_PUBLIC_DNSSpark應用程式將要通知的其他機器的主機名。除了列出的變數之外,還有設定Spark獨立叢集指令碼的選項,比如在每臺機器上使用的核心數量和最大記憶體。因為spark-env.sh是一個shell指令碼,所以可以通過程式設計設定其中一些;例如,您可以通過查詢特定網路介面的IP來計算SPARK_LOCAL_IP。

注意在cluster模式下對Yarn執行Spark時,需要使用Spark .YARN.appmasterenv設定環境變數。在conf/spark-default.conf檔案中設定[EnvironmentVariableName]屬性。在spark-env.sh中設定的環境變數不會在cluster模式下反映在Yarn Application Master中。有關更多資訊,請參見Yarn相關的Spark屬性。
16.5.8. 應用程式中的Job排程

在給定的Spark應用程式中,如果從單獨的執行緒提交多個並行作業,則可以同時執行它們。在本節中,我們所說的job指的是一個Spark action和任何需要執行以計算該action的任務。Spark的排程程式是完全執行緒安全的,並且支援此用例來啟用服務於多個請求的應用程式(例如,針對多個使用者的查詢)。

預設情況下,Spark的排程程式以FIFO方式執行作業。如果佇列頭部的作業不需要使用整個叢集,則稍後的作業可以立即開始執行,但是如果佇列頭部的作業很大,則稍後的作業可能會顯著延遲。

還可以配置作業之間的公平共享。在公平共享下,Spark以迴圈方式在作業之間分配任務,以便所有作業獲得大致相等的叢集資源共享。這意味著在長作業執行時提交的短作業可以立即開始接收資源,並且仍然可以在不等待長作業完成的情況下獲得良好的響應時間。這種模式最適合多使用者設定。

要啟用公平排程程式,設定spark.scheduler.mode為Fair,可以在SparkContext中設定。

Fair排程程式還支援將作業分組到池中,併為每個池設定不同的排程選項或權重。這可以為更重要的作業建立高優先順序池,或者將每個使用者的作業分組在一起,並給使用者平等的共享,而不管他們有多少併發作業,而不是給作業平等的共享。該方法模仿Hadoop Fair排程程式。

在不進行任何干預的情況下,新提交的作業將進入預設池,可以通過設定spark.scheduler.pool屬性來設定作業池。這是這樣做的(假設sc是您的SparkContext:

sc.setLocalProperty("spark.scheduler.pool","pool1")

設定此LocalProperty後,此執行緒中提交的所有作業都將使用此池名稱。設定為每個執行緒,以便讓一個執行緒可以方便地代表同一個使用者執行多個作業。如果希望清除執行緒關聯的池,請將其設定為null。

16.6. 結束語

本章涵蓋了很多關於Spark應用程式的內容;我們學習瞭如何用Spark的所有語言編寫、測試、執行和配置它們。在第17章中,我們將討論在執行Spark應用程式時的部署和叢集管理選項。