1. 程式人生 > 其它 >spark怎麼呼叫hadoop_Spark程式設計指南(二)

spark怎麼呼叫hadoop_Spark程式設計指南(二)

技術標籤:spark怎麼呼叫hadoop

點選上方“藍字”關注我

彈性分散式資料集(RDD)

Spark圍繞彈性分散式資料集(RDD)的概念展開,RDD是可並行操作的可容錯的元素集合。有兩種方法可以建立RDD:並行化一個驅動程式中的已存在的集合,或引用外部儲存系統(例如共享檔案系統、HDFS、HBase或提供Hadoop InputFormat的任何資料來源)中的資料集。

並行集合

通過在驅動程式中已存在的集合(Scala Seq)上呼叫SparkContext的parallelize方法來建立並行集合。複製集合的元素以形成可以並行操作的分散式資料集。例如,如下是建立包含數字1到5的並行化集合的方法:

valdata=Array(1,2,3,4,5)
valdistData=sc.parallelize(data)

一旦建立好後,我們就可以並行的處理分散式資料集(distData)。例如,我們可以呼叫distData.reduce((a, b) => a + b)來對陣列中的元素進行累加。我們將在稍後介紹對分散式資料集的操作。

並行集合的一個重要引數是將資料集切分的分割槽數。Spark將為叢集的每個分割槽執行一個任務。通常,叢集中的每個CPU都需要2-4個分割槽。通常,Spark會嘗試根據您的叢集自動設定分割槽數。但是,您也可以通過將其作為第二個引數傳遞來進行手動設定以進行並行化(例如sc.parallelize(data, 10))

)。注意:程式碼中的某些地方使用術語切片(分割槽的同義詞)來保持向後相容性。

外部資料集

Spark可以從Hadoop支援的任何儲存源建立分散式資料集,包括您的本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等。Spark支援文字檔案,SequenceFiles和任何其他Hadoop InputFormat。

可以使用SparkContext的textFile方法建立文字檔案RDD。這個方法需要檔案的URI作為引數(機器上的本地路徑、或hdfs://s3n://等URI),並將其讀取為行的集合。如下是一個示例呼叫:

scala>valdistFile=sc.textFile("data.txt")
distFile:org.apache.spark.rdd.RDD[String]=data.txtMapPartitionsRDD[10]attextFileat:26

distFile一旦建立好後,我們就可以對其進行資料集的操作。例如,我們可以使用map函式和reduce函式對所有資料行的大小進行累加求和:distFile.map(s => s.length).reduce((a, b) => a + b)

關於使用Spark讀取檔案的一些注意事項:

  • 如果使用本地檔案系統路徑,則必須在工作節點上的使用相同路徑去訪問該檔案。要麼將檔案複製給所有的worker 節點,要麼使用網路安裝的共享檔案系統。
  • Spark的所有基於檔案的輸入方法,包括textFile,都支援傳入檔案的目錄、壓縮檔案以及萬用字元,例如,您可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")
  • textFile方法還帶有一個可選的第二個引數,用於控制檔案的分割槽數。預設情況下,Spark為檔案的每個塊建立一個分割槽(HDFS中塊的預設大小為128MB),但是您也可以通過傳遞更大的值來請求更多數量的分割槽。請注意,分割槽的數量不能少於塊的數量。

除文字檔案外,Spark的Scala API還支援其他幾種資料格式

  • SparkContext.wholeTextFiles方法可讓您讀取包含多個小文字檔案的目錄,並對每一個小文字檔案以(filename, content)對的形式返回。這與textFile方法相反,textFile方法將在每個檔案的每一行返回一條記錄。
  • 對於SequenceFiles,請使用SparkContext的sequenceFile[K,V]方法,其中K和V是檔案中key和value的型別。key和value的型別需要實現Hadoop中的Writable介面,例如IntWritable和Text。此外,Spark允許您為一些常見的Writables指定本地型別。例如,sequenceFile [Int,String]將自動讀取IntWritables和Texts。
  • 對於其他Hadoop InputFormat,你可以使用SparkContext.hadoopRDD方法,該方法需要輸入任意的JobConf、input format型別、鍵型別和值型別。如何設定這些引數是與對輸入源進行Hadoop作業的方式相同。您還可以基於新的MapReduce API(org.apache.hadoop.mapreduce)SparkContext.newAPIHadoopRDD用於InputFormats。
  • RDD.saveAsObjectFileSparkContext.objectFile支援以包含序列化Java物件的簡單格式來儲存RDD。儘管這不像Avro這樣的專用格式有效,但它提供了一種儲存任何RDD的簡便方法。

RDD操作

RDD支援兩種型別的操作:transformations(從已存在的資料集建立新的資料集)和actions(在資料集上執行計算後,將值返回給驅動程式)。例如,map是一種transformation 操作,它將每個資料集元素通過一個函式傳遞,並返回代表結果的新RDD。另一方面,reduce是使用某些函式聚合RDD的所有元素並將最終結果返回給驅動程式的actions操作(儘管還有並行的reduceByKey返回分散式資料集)。

Spark中的所有transformations操作都是懶載入的,因為它們不會立即計算出結果。相反,他們只記得應用於某些基本資料集(例如檔案)的transformations操作。僅當action操作被執行才會計算transformations操作並將計算結果返回給驅動程式。這種設計使Spark可以更高效地執行。例如,我們可以知道到通過map建立的資料集將用於reduce中,並且僅將reduce的結果返回給驅動程式,而不是將較大的做過map計算的資料集返回給驅動程式。

預設情況下,每次在執行操作時,可能會重新計算每個轉換後的RDD。但是,您也可以使用persist(或cache)方法將RDD保留在記憶體中,在這種情況下,Spark會將元素保留在叢集中,以便下次查詢時可以更快地進行訪問。還支援將RDD持久儲存在磁碟上,或在多個節點之間複製。

基礎

為了說明RDD的基礎知識,請對如下簡單程式進行思考:

vallines=sc.textFile("data.txt")
vallineLengths=lines.map(s=>s.length)
valtotalLength=lineLengths.reduce((a,b)=>a+b)

第一行從外部檔案定義了一個基本RDD。該資料集不會載入到記憶體中或沒有其他的作用:行僅僅是檔案的引用。第二行將mapLengths定義為map轉換的結果。同樣,由於懶載入,不會立即計算出lineLengths的結果。最後,我們執行reduce,這是一個action運算元。此時,Spark將計算分解為任務並在不同的機器上執行,並且每臺機器都執行map計算的一部分以及在每臺機器上進行reduce計算,僅將計算的結果返回給驅動程式。

如果我們以後也想再次使用lineLengths,可以新增:

lineLengths.persist()

在執行reduce運算元之前,這將會使在第一次計算之後將lineLengths儲存在記憶體中。

將函式傳遞給Spark

Spark的API在很大程度上依賴於在驅動程式中傳遞函式來在叢集上執行。有兩種推薦的方法可以做到這一點:

  • 使用匿名函式的語法,可用於簡短的程式碼片段。

  • 全域性單例物件中的靜態方法。例如,您可以定義MyFunctions物件,然後傳遞MyFunctions.func1,如下所示:

objectMyFunctions{
deffunc1(s:String):String={...}
}

myRdd.map(MyFunctions.func1)

需要注意的是雖然也可以在類例項中傳遞對方法的引用(與單例物件相對),但這需要將包含該類的物件與方法一起傳送。例如,一起思考如下例子:

classMyClass{
deffunc1(s:String):String={...}
defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(func1)}
}

在這裡,如果我們建立一個新的MyClass例項並在其上呼叫doStuff方法,則其中的map將引用該MyClass例項的func1方法,因此需要將整個物件傳送到叢集。這類似於編寫rdd.map(x => this.func1(x))

以類似的方式,訪問外部物件的欄位將引用整個物件:

classMyClass{
valfield="Hello"
defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(x=>field+x)}
}

等價於rdd.map(x => this.field + x),引用了this物件。為避免此類問題,最簡單的方法是將欄位複製到區域性變數中,而不是從外部訪問它:

defdoStuff(rdd:RDD[String]):RDD[String]={
valfield_=this.field
rdd.map(x=>field_+x)
}

理解閉包

關於Spark的難點之一是在跨叢集執行程式碼時理解變數和方法的作用域和生命週期。修改超出其作用域的變數的RDD操作可能經常引起混亂。在下面的示例中,我們將介紹使用foreach()遞增計數器的程式碼,但是其他操作也會發生類似的問題。

案例

思考下面計算普通RDD元素的總和,執行結果可能會有所不同,具體取決於是否在同一個JVM虛擬機器中執行。一個常見的例子是在本地模式下執行Spark(--master = local [n])而不是將Spark應用程式部署到叢集上(例如,通過將spark-submit提交給YARN):

varcounter=0
varrdd=sc.parallelize(data)

//Wrong:Don'tdothis!!
rdd.foreach(x=>counter+=x)

println("Countervalue:"+counter)

本地模式和叢集模式的對比

上面的程式碼的執行結果是不能確定的,可能無法按預期工作。為了執行作業,Spark將RDD操作的處理分解為多個任務,每個任務都由一個執行器執行。在執行之前,Spark會計算任務的閉包。閉包是執行器在RDD上執行其計算所必須可見的那些變數和方法(如本例中foreach())。此閉包在被序列化併發送給每個執行器。

傳送給每個執行器的閉包中的變數已經被拷貝,因此,在foreach函式中引用counter變數時,它不再是驅動程式節點上的counter變數。驅動程式節點的記憶體中仍然存在一個counter 變數,但是該counter 變數不再對執行器可見!執行器僅從序列化的閉包中看到副本。因此,因為對counter 變數的所有操作都引用了序列化閉包內的值,所以最終counter 變數的值將仍然為零。

在本地模式中,在某些情況下,foreach函式實際上將在與驅動程式相同的JVM中執行,並且將引用相同的原始counter 變數,並且可能會對其進行實際更新。

為了確保在這種情況下的執行結果明確,應該使用累加器。Spark中的累加器專門用於提供一種機制,用於在叢集中的各個工作節點之間拆分執行時安全地更新變數。本指南的累加器部分將詳細討論這些內容。

通常,閉包-類似於迴圈或區域性定義的方法之類的構造,不應該用來改變某些全域性狀態。Spark不定義或保證從閉包外部引用的物件的突變行為。某些執行此操作的程式碼可能會在本地模式下能正常工作,但這只是偶然的情況,此類程式碼在分散式模式下將無法正常執行。如果需要某些全域性聚合,請使用累加器。

列印RDD的元素

另一個常見用法是嘗試使用rdd.foreach(println)rdd.map(println)打印出RDD的元素。在單臺機器上,這將產生預期的輸出並列印所有RDD的元素。但是,在叢集模式下,執行器正在呼叫的stdout輸出是正在寫入執行器的stdout,而不是驅動程式上的那個,因此驅動程式上的stdout不會顯示這些資訊!要在驅動程式上列印所有元素,可以使用collect()方法首先將RDD帶到驅動程式節點:rdd.collect().foreach(println)。但是,這可能會導致驅動程式用盡記憶體,因為collect()方法會將整個RDD提取到一臺機器上。如果只是需要列印RDD的一些元素,則更安全的方法是使用take()方法:rdd.take(100).foreach(println)

ce635daf1d7093e960869d4639a56bdb.png 0a938c8bca20fc1aeb2950674f39743b.png 公眾號ID:ldc11235 掃碼關注最新動態,跟我一起學大資料 aeebf202eae2db9f432394b6f8016993.gif