1. 程式人生 > >彈性分散式資料集(RDD)

彈性分散式資料集(RDD)

並行集合

並行集合 (Parallelized collections) 的建立是通過在一個已有的集合(Scala Seq)上呼叫 SparkContext 的 parallelize 方法實現的。集合中的元素被複制到一個可並行操作的分散式資料集中。例如,這裡演示瞭如何在一個包含 1 到 5 的陣列中建立並行集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)



一旦建立,分散式資料集(distData)可以並行操作。例如,我們可能呼叫distData.reduce((a, b) => a + b)將陣列的元素相加。

並行收集的一個重要引數是要將資料集剪下成的分割槽數量。Spark將為群集的每個分割槽執行一個任務。通常情況下,您需要為群集中的每個CPU分配2-4個分割槽。通常情況下,Spark會嘗試根據您的群集自動設定分割槽數量。但是,也可以通過將其作為第二個引數傳遞給parallelize(eg sc.parallelize(data, 10))來進行手動設定。注意:程式碼中的一些地方使用術語切片(分割槽的同義詞)來維持向後相容性。

scala> distData.partitions.size
res4: Int = 4  //一般根據電腦的核數為預設分割槽


scala> val distData_5 = sc.parallelize
(data,5)//可以指定分割槽 distData_5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:29 scala> distData_5.partitions.size res6: Int = 5

外部資料集

Spark可以從Hadoop支援的任何儲存源(包括本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等)建立分散式資料集.Spark支援文字檔案,SequenceFile和任何其他Hadoop InputFormat。

文字檔案RDDS可以使用建立SparkContext的textFile方法。此方法需要一個URI的檔案(本地路徑的機器上,或一個hdfs://,s3n://等URI),並讀取其作為行的集合。這是一個示例呼叫:


scala> val textFile = sc.textFile("../README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[5] at textFile at <console>:27

一旦建立,分散式資料集(textFile)可以並行操作。例如,textFile.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)單詞統計。

使用Spark讀取檔案的一些注意事項:

  • 如果在本地檔案系統上使用路徑,則該檔案也必須可以在工作節點上的相同路徑上訪問。將檔案複製到所有工作人員或使用網路安裝的共享檔案系統。

  • Spark的所有基於檔案的輸入方法,包括textFile支援在目錄,壓縮檔案和萬用字元上執行。例如,你可以使用textFile(“/my/directory”),textFile(“/my/directory/.txt”)和textFile(“/my/directory/.gz”)。

  • 該textFile方法還採用可選的第二個引數來控制檔案的分割槽數量。預設情況下,Spark為檔案==的每個塊建立一個分割槽(HDFS中的塊預設為128MB)==,但是您也可以通過傳遞更大的值來請求更多的分割槽。請注意,您不能有比塊更少的分割槽。具體的分割槽請參考:
    spark Rdd的預設分割槽

除了文字檔案外,Spark的Scala API還支援其他幾種資料格式:

SparkContext.wholeTextFiles讓你閱讀一個包含多個小文字檔案的目錄,並將它們作為(檔名,內容)對返回。這與textFile每個檔案每行返回一條記錄相反。分割槽由資料區域性性決定,在某些情況下可能導致分割槽太少。對於這些情況,wholeTextFiles提供一個可選的第二個引數來控制分割槽的最小數量。

對於SequenceFiles,使用SparkContext的sequenceFile[K, V]方法,其中K和V是檔案中的鍵和值的型別。這些應該是Hadoop的Writable介面的子類,如IntWritable和Text。另外,Spark允許您為幾個常見Writable指定本機型別; 例如,sequenceFile[Int, String]將自動讀取IntWritables和文字。

對於其他Hadoop InputFormats,可以使用該SparkContext.hadoopRDD方法,該方法採用任意的JobConf輸入格式類,關鍵類和值類。將它們設定為您使用輸入源進行Hadoop作業的方式相同。也可以使用SparkContext.newAPIHadoopRDD基於“新”MapReduce API(org.apache.hadoop.mapreduce)的InputFormats 。

RDD.saveAsObjectFile並SparkContext.objectFile支援以包含序列化Java物件的簡單格式儲存RDD。雖然這不像Avro這樣的專業格式,但它提供了一種簡單的方法來儲存任何RDD。

RDD操作

RDDS支援兩種型別的操作:轉變(transformations),從現有的建立一個新的資料集和行動,動作(action)的資料集的計算後的值返回驅動程式。例如,map是一個通過函式傳遞每個資料集元素的變換,並返回表示結果的新RDD。另一方面,reduce是一個動作,使用某個函式來聚合RDD的所有元素,並將最終結果返回給驅動程式(儘管也有一個並行reduceByKey返回分散式資料集)。

Spark中的所有轉換(transformations)都是懶惰的,因為它們不會馬上計算結果。相反,他們只記得應用於某些基礎資料集(例如檔案)的轉換。只有在動作(action
)時驅動程式(diver)才會計算轉換。這種設計使Spark能夠更高效地執行。例如,我們可以認識到,通過建立的資料集map將被用於action中,reduce並且只返回reduce給驅動程式的結果,而不是更大的對映資料集。

預設情況下,每次對其執行操作時,每個已轉換的RDD都可能重新計算。但是,您也可以使用(或)方法將RDD 保留在記憶體中,在這種情況下,Spark將保留群集中的元素,以便在下次查詢時快速訪問。還支援在磁碟上持久化RDD,或在多個節點上覆制RDD.persistcache

基礎

為了說明RDD基礎知識,請考慮下面的簡單程式:


val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行定義了來自外部檔案的基本RDD。這個資料集不會被載入到記憶體中或者作用於其他地方:lines僅僅是一個指向檔案的指標。

第二行定義lineLengths為map轉換的結果。再次,lineLengths 是不是馬上計算,由於懶惰。

最後,我們跑reduce,這是一個行動。在這一點上,Spark將計算分解為在不同機器上執行的任務,每臺機器既執行其地圖部分又執行區域性縮減,只返回驅動程式的答案。

如果我們還想以後再使用lineLengths,可以新增:

lineLengths.persist()

之前reduce的結果ineLengths在第一次計算後儲存在記憶體中。

spark函式的應用

匿名函式的應用:

def add(x: Int, y: Int): Int = x + y
println(add(1, 2)) // 3

全域性單例物件裡的靜態方法,例如,您可以定義object MyFunctions並傳遞MyFunctions.func1,如下所示:

object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

類函式的應用:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

object = new MyClass()
object.doStuff(myRdd) 等價於 rdd.map(x => object.func1(x))。

Shuffle

Spark中的某些操作會觸發一個稱為shuffle的事件。洗牌是Spark重新分配資料的機制,以便在不同分割槽之間進行分組。這通常涉及在執行者和機器之間複製資料,使得洗牌成為複雜而昂貴的操作。

為了理解在洗牌過程中會發生什麼,我們可以考慮reduceByKey操作的例子 。該reduceByKey操作將生成一個新的RDD,其中單個鍵的所有值都組合為一個元組 - 鍵和對與該鍵相關的所有值執行reduce函式的結果。面臨的挑戰是,並不是所有的單個金鑰的值都必須位於同一個分割槽,甚至是同一個機器上,但是它們必須位於同一地點才能計算出結果。

在Spark中,資料通常不是跨分割槽分佈,而是在特定操作的必要位置。在計算過程中,單個任務將在單個分割槽上執行 - 因此,要組織所有資料reduceByKey以執行單個reduce任務,Spark需要執行全部操作。它必須從所有分割槽中讀取所有鍵的值,然後將各個分割槽上的值彙總在一起,以計算每個鍵的最終結果 - 這就是所謂的Shuffle。

雖然新洗牌資料的每個分割槽中的元素集合是確定性的,分割槽本身的排序也是確定性的,但這些元素的整體排序並不是確定。如果希望shuffle之後得到可預測的有序資料,那麼可以使用:

mapPartitions 使用例如對每個分割槽進行排序
repartitionAndSortWithinPartitions 在對分割槽進行有效分類的同時進行分割槽
sortBy 做一個全球有序的RDD

可以進行shuffle的還有:

repartition and coalesce
groupByKey and reduceByKey
cogroup and join.

效能影響:
shuffle的操作的代價是非常高昂的,,因為它涉及的磁碟I / O,資料序列,和網路I / O。

RDD永續性

Spark中最重要的功能之一就是在記憶體中持續(或快取)一個數據集。當持久化RDD時,每個節點儲存它在記憶體中計算的所有分割槽,並在該資料集的其他操作(或從中派生的資料集)中重用它們。這使未來的行動更快(通常超過10倍)。快取是迭代演算法和快速互動式使用的關鍵工具。

cache() 預設MEMORY_ONLY
persist() 可以選擇不同的儲存等級:

儲存 級別 含義
MEMORY_ONLY 將RDD作為反序列化的Java物件儲存在JVM中。如果RDD不適合記憶體,某些分割槽將不會被快取,並且每次需要時都會重新進行計算。這是預設級別。
MEMORY_AND_DISK 將RDD作為反序列化的Java物件儲存在JVM中。如果RDD不適合記憶體,也不適合磁碟儲存將不會被快取,並在需要時從中讀取。
MEMORY_ONLY_SER (Java和Scala) 將RDD儲存為序列化的 Java物件(每個分割槽一個位元組的陣列)。這通常比反序列化的物件更節省空間,特別是在使用 快速序列化器的情況下,但需要消耗更多的CPU資源。
MEMORY_AND_DISK_SER (Java和Scala) 與MEMORY_ONLY_SER類似,但是將不適合記憶體的分割槽溢位到磁碟,而不是每次需要時重新計算它們。
DISK_ONLY 將RDD分割槽僅儲存在磁碟上。

備註:spark在shuffer的時候預設會進行persist(),但是為了安全我們會再次呼叫persist()快取

選擇哪個儲存級別?

Spark的儲存級別旨在提供記憶體使用和CPU效率之間的不同折衷。我們建議通過以下過程來選擇一個:
- 如果你的RDD適合預設的儲存級別(MEMORY_ONLY),那就留下來吧。這是CPU效率最高的選項,允許RDD上的操作儘可能快地執行。

  • 如果沒有,嘗試使用MEMORY_ONLY_SER和選擇一個快速的序列化庫,使物件更加節省空間,但仍然相當快地訪問。(Java和Scala)
  • 除非計算您的資料集的計算很昂貴,否則他們會過濾大量資料,否則不要寫到磁碟。否則,重新計算可能與從磁碟讀取一樣快。

- 如果要快速恢復故障,請使用複製的儲存級別

刪除資料

Spark會自動監視每個節點上的快取記憶體使用情況,並以最近最少使用(LRU)方式刪除舊的資料分割槽。如果您想要手動刪除RDD,而不是等待其從快取中刪除,請使用該RDD.unpersist()方法。

共享變數

廣播變數

廣播變數允許程式設計師在每臺機器上儲存一個只讀變數,而不是用任務傳送一個只讀變數的副本。例如,可以使用它們以有效的方式為每個節點提供大型輸入資料集的副本。Spark還嘗試使用高效的廣播演算法來分發廣播變數,以降低通訊成本。

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

備註:廣播變數不可以被修

累加器

數值累加器可以通過呼叫SparkContext.longAccumulator()或SparkContext.doubleAccumulator() 累加Long或Double型別的值來建立。在叢集上執行的任務可以使用該add方法新增到叢集中。但是,叢集無法拿到累加器值。只有驅動程式可以使用其value方法讀取累加器的值。

下面的程式碼顯示了一個累加器被用來加總一個數組的元素:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

對於僅在動作內執行的累加器更新,Spark保證每個任務對累加器的更新只應用一次,即重新啟動的任務不會更新值。在轉換中,使用者應該意識到,如果任務或作業階段被重新執行,每個任務的更新可能會被應用多次。