1. 程式人生 > >Spark學習(二)

Spark學習(二)

RDD(resilient distributed dataset)

RDD概念

RDD(Resilient Distributed Dataset)是一個彈性分散式資料集,是SPark提供的抽象的彈性分散式資料集(RDD),它是可以並行操作的跨叢集節點的元素集合。RDDs是從Hadoop檔案系統中的一個檔案(或任何其他Hadoop支援的檔案系統)或驅動程式中現有的Scala集合開始建立的,並對其進行轉換。使用者也可以要求spark快取記憶體中的RDD,允許在並行操作中有效地重用RDD。最後,RDDs從節點故障中自動恢復。 它代表一個不可變(immutable)、可分割槽(partioned)、裡面的元素可平行計算(parallel)的集合。 RDD具有資料流模型的特點:自動容錯

、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

彈性:可大可小,在(記憶體或者磁碟)計算 分散式:並行處理

RDD屬性

一組分片(Partition),即資料集的基本組成單位。基於分片做計算,對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。 一個計算每個分割槽的函式。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。 RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。 一個Partitioner,即RDD的分片函式。(hash分割槽器,範圍分割槽器)當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了parentRDD Shuffle輸出時的分片數量。 一個列表,儲存存取每個Partition的優先位置(preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。(就近原則)

建立RDD

/usr/local/spark-1.6.3-bin-hadoop2.6/bin/spark-shel(sparkContext會馬上建立) 建立方式(3種)

1)由一個Scala集合或陣列以並行化的方式建立

RDD建立:

val rdd = sc.parallelize(Array(3,4,5,6,7))

RDD建立的過程中,一個重要引數是把資料集切分的分割槽數。Spark針對叢集中的每一個分割槽執行一個任務(task),一般,叢集中每一個CPU分配2-4個分割槽,Spark會基於叢集自動設定分割槽數目,但是,也可以手動指定,比如:

val rdd1 = sc.parallelize(Array(3,4,5,6,7),5)

2)由外部儲存系統的資料集建立

包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HDFS, Cassandra, HBase, Amazon S3等,Spark支援文字檔案、序列檔案(SequenceFiles are flat files consisting of binary key/value pairs."Flat"which means it has no structure for indexing and there are usually no structural relationships between the records. )以及任何Hadoop支援的資料集。

//textFile需要一個表示檔案地址的URI引數,該檔案可以是本地檔案,或者 hdfs://, s3a://等
//textFile讀取檔案的結果是以檔案每一行作為一個元素的集合
val rdd2 = sc.textFile("hdfs://hadoop01:9000/input/01/test")

使用Spark讀取檔案需要注意的點:

  • 如果讀取的是本地檔案,該檔案必須是worker節點可以存取的,通過拷貝檔案到所有的worker節點或者使用基於掛載的共享檔案系統;

  • spark中所有引數是檔案的方法,包括文字檔案,都支援目錄、壓縮檔案和萬用字元檔案,比如可以使用:

    textFile("/my/directory") textFile("/my/directory/.txt") textFile("/my/directory/.gz")

  • textFile 方法也有一個可選引數控制檔案的分割槽數量,預設情況下,Spark檔案的每一個block生成一個partition分割槽(HDFS預設快大小為128MB),但是也可以傳入一個更大的值獲取更多的分割槽,但是無法獲取少於block個數的分割槽個數。

3)使用已有的RDD

通過Transformation運算元生成一個新的RDD

val rdd1 = rdd.map(_*10)//RDD一旦建立,分散式的資料集就可以執行平行計算

擴充套件:檢視RDD:rdd1.collect()

RDD的運算元

RDD的運算元分兩種型別:Transformation和Action,具有Transformation屬性的運算元,具有惰性,不會直接計算結果,具有Action屬性的運算元,觸發運算元的執行

Transformation

RDD中的所有轉換都是延遲載入的,具有懶惰的屬性。也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。

常用的Transformation運算元:

map:對RDD中的所有元素施加一個函式對映,返回一個新的RDD。

val rdd0 = sc.parallelize(Array(1,2,3,56))
val rdd1 = rdd0.map(x=>x*10)

filter:返回滿足條件的元素組成的新RDD

val rdd2 = rdd1.filter(_<100)
rdd2.collect()

filterByRange:通過範圍過濾

flatMap:把集合中的集合取出來,先map再flat,map後一定要是集合

rdd0.flatMap(1 to _)

union,++:對集合進行合併

val rdd3 = sc.parallelize(1 to 10)
val rdd4 = rdd3 union res5
rdd4.collection

intersection 交集

val rdd5 = rdd3 intersection res5
rdd5.collect()

distinct:去重元素之後的RDD

val rdd6 = rdd4.distinct()
rdd6.collect()

join:對兩個需要連線的 RDD 進行 cogroup函式操作,將相同 key 的資料能夠放到一個分割槽,在 cogroup 操作之後形成的新 RDD 對每個key 下的元素進行笛卡爾積的操作,返回的結果再展平,對應 key 下的所有元組形成一個集合。最後返回 RDD[(K,(V, W))]

join運算元(join,leftOuterJoin,rightOuterJoin) 1)只能通過PairRDD使用 2)join運算元操作的Tuple2<Object1, Object2>型別中,Object1是連線鍵

join(otherDataset, [numTasks])是連線操作,將輸入資料集(K,V)和另外一個數據集(K,W)進行Join, 得到(K,(V,W));該操作是對於相同K的V和W集合進行笛卡爾積 操作,也即V和W的所有組合

val rdd7 = sc.parallelize(Array(("zhangsan",100),("lisi",80),("wangwu",78)))
val rdd8 = sc.parallelize(Array(("zhangsan",90),("lisi",88),("wangwu",69)))
val rdd9 = rdd7.join(rdd8)
rdd9.collect()
val rdd10 =rdd7 leftOuterJoin rdd8
rdd10.collect()

mapPartitions 對整個分割槽進行操作,map對每一個元素處理 mapPartitions是map的一個變種。map的輸入函式應用於RDD中的每個元素,而mapPartitions的輸入函式應用於每個分割槽,也就是把每個分割槽中的內容作為整體來處理的。

val rdd11 = sc.parallelize(Array("a","b","c"))
rdd11.mapPartitions(iter=>iter.map(_+"char"))//索引
res12.collect()

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3) rdd1.getNumPartitions

分割槽索引號,迭代器 def fun(index:Int,iter:Iterator[Int])={ iter.map(x=>index +" "+x) }

mapPartitionsWithIndex:map函式同樣作用於整個分割槽,同時可以獲取分割槽的index資訊。

val rdd2  = rdd1.mapPartitionsWithIndex(fun)
rdd2.collect()

比較 map flatmap mapPartitions mapPartitionsWithIndex

sample:隨機取(取樣),返回抽樣得到的子集。

withReplacement:是否重複  fraction:取樣的比率  seed:種子(偽隨機,每次隨機數一樣)
val a = sc.parallelize(1 to 1000,3)
val b = a.sample(true,0.05,100)
b.count
b.collect()

keyBy:通過在每個元素上應用一個函式生成鍵值對中的鍵,最後構成一個鍵值對元組。(fun(x),x) 產生一個元組,一個元素對映為一個元組((1,a), (1,b), (1,c))傳入一個函式,得到一個結果 clean 函式及依賴(函式之外的東西)作為一個閉包處理

val rdd1 = sc.parallelize(Array("a","b","c"))
val rdd2 = rdd1.keyBy(_.length)
rdd2.collect()

mapValues:針對(Key,Value)型資料中的 Value 進行 Map 操作,而不對 Key 進行處理(只對value做操作)

val rdd3 = sc.parallelize(Array("zhangsan","lisi"))
val rdd4 = rdd3.map(x=>(x,x.length))
rdd4.collect()
val rdd5 = rdd4.mapValues(_+1)
rdd5.collect()

flatMapvalues:對value做操作後壓平

groupByKey:按照key值進行分組(根據key做聚合),每次結果可能不同,因為發生shuffle

val rdd6 = sc.parallelize(Array("a","b","c"))
val rdd7 = rdd6.keyBy(_.length)
val rdd8 = rdd7.groupByKey()
rdd8.collect()

reduceByKey:對元素為KV對的RDD中Key相同的元素的Value進行reduce操作

val a = sc.parallelize(List("dog","cat","owl","gnu","ant"),2)
val b = a.map(x=>(x.length,x))
b.reduceByKey(_+_).collect

aggregateByKey:key相同的才可以進行聚合操作

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12),("mouse", 2)), 2)

// lets have a look at what is in the partitions
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.map(x => "[partID:" + index + ", val: " + x + "]")
}
pairRDD.mapPartitionsWithIndex(myfunc).collect
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

foldByKey:針對鍵值對的RDD進行聚合

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.foldByKey("")(_ + _).collect

Action運算元: 觸發Spark作業的執行,真正觸發轉換運算元的計算

動作 含義
reduce(func) 通過func函式聚集RDD中的所有元素,這個功能必須是可交換且可並聯的
collect() 在驅動程式中,以陣列的形式返回資料集的所有元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似於take(1))
take(n) 返回一個由資料集的前n個元素組成的陣列
takeSample(withReplacement,num,[seed]) 返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering]) takeOrdered和top類似,只不過以和top相反的順序返回元素
saveAsTextFile(path) 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字
saveAsSequenceFile(path) 將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在資料集的每一個元素上,執行函式func進行更新。

基礎知識

基本認識

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行定義了一個RDD,資料集不會載入到記憶體,或者不會有基於該資料集的操作,lines僅僅是一個指向該檔案的一個指標; 第二行定義了lineLengths作為map轉換函式的結果,由於惰性,lineLengths 不會立即計算; 最後,呼叫reduce,它是一個action運算元,這是Spark把計算分成多個任務在各個機上執行,每臺機器執行自己這部分資料的map和reduction,返回結果給驅動程式。 如果,之後還打算使用lineLengths,可以新增:lineLengths.persist(),在reduce之前,可以把lineLengths在首次計算後儲存在記憶體。

傳入函式

Spark 有很多API需要提供函式式的傳入引數,有兩種方式推薦:

匿名函式,在執行程式碼較少的情況下使用; 使用全域性單例物件中的靜態方法

object MyFunctions {
	def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. Forexample, consider:

class MyClass {
	def func1(s: String): String = { ... }
	def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

Here, if we create a new MyClass instance and call doStuff on it, the map inside there references the func1method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)). In a similar way, accessing fields of the outer object will reference the whole object:

class MyClass {
	val field = "Hello"
	def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

is equivalent to writing rdd.map(x => this.field + x), which references all of this. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:

def doStuff(rdd: RDD[String]): RDD[String] = {
	val field_ = this.field
	rdd.map(x => field_ + x)
}

理解閉包

var counter = 0
var add = sc.parallelize(data)
//Wrong:Don't do this
rdd.foreach (x=>counter += x)
println("Counter value:"+counter)

本地和叢集模式

上面的程式碼行為是不確定的,並且可能無法按預期正常工作。執行作業時,Spark 會分解 RDD 操作到每個executor 中的 task 裡。在執行之前,Spark 計算任務的 closure(閉包)。閉包是指 executor 要在RDD上進行計算時必須對執行節點可見的那些變數和方法(在這裡是foreach())。閉包被序列化並被髮送到每個 executor。 閉包的變數副本發給每個 executor ,當 counter 被 foreach 函式引用的時候,它已經不再是 driver node 的counter 了。雖然在 driver node 仍然有一個 counter 在記憶體中,但是對 executors 已經不可見。executor 看到的只是序列化的閉包一個副本。所以 counter 最終的值還是 0,因為對 counter 所有的操作均引用序列化的closure 內的值。 在 local 本地模式,在某些情況下的 foreach 功能實際上是同一 JVM 上的驅動程式中執行,並會引用同一個原始的counter計數器,實際上可能更新. 為了確保這些型別的場景明確的行為應該使用的 Accumulator 累加器。當一個執行的任務分配到叢集中的各個worker 結點時,Spark 的累加器是專門提供安全更新變數的機制。

列印 RDD 的 elements

另一種常見的語法用於列印 RDD 的所有元素使用 rdd.foreach(println) 或 rdd.map(println)。在一臺機器上,這將產生預期的輸出和列印 RDD 的所有元素。然而,在叢集 cluster 模式下,stdout 輸出正在被執行寫操作executors 的 stdout 代替,而不是在一個驅動程式上,因此 stdout 的 driver 程式不會顯示這些!要列印 driver程式的所有元素,可以使用的 collect() 方法首先把 RDD 放到 driver 程式節點上: rdd.collect().foreach(println)。這可能會導致 driver 程式耗盡記憶體,因為 collect() 獲取整個 RDD 到一臺機器; 如果你只需要列印 RDD 的幾個元素,一個更安全的方法是使用 take(): rdd.take(100).foreach(println)。

鍵值對

雖然大多數 Spark 操作工作在包含任何型別物件的 RDDs 上,只有少數特殊的操作可用於 Key-Value 對的 RDDs.最常見的是分散式 “shuffle” 操作,如通過元素的 key 來進行 grouping 或 aggregating 操作.

例如,下面的程式碼使用的 Key-Value 對的 reduceByKey 操作統計文字檔案中每一行出現了多少次:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我們也可以使用 counts.sortByKey() ,例如,在對按字母順序排序,最後 counts.collect() 把他們作為一個數據物件返回給 driver 程式。 注意: 當在 key-value 對操作中使用自定義的 objects 作為 key 時, 必須確保有一個自定義的 equals() 方法和hashCode() 方法。

分割槽

為什麼要分割槽

分割槽的目標之一就是減少資料通過網路傳輸的量,最大化的利用分割槽有益於執行作業的效率,因為分割槽後可以並行執行大量task在不同叢集節點上。

如何進行分割槽

預設情況下,spark對每一個RDD會自動計算分割槽,但是我們也可以在建立RDD的時候指定分割槽數。

1)系統分割槽

對於鍵值對資料進行分割槽,可以基於分割槽器,有兩種分割槽器可供選擇:

  • 雜湊分割槽:分割槽基於儲存物件的雜湊碼,目前使用的計算公式是:partition = hashCode % numberOfPartitions
  • 範圍分割槽:通過數值範圍分割槽。 比如我們有一個list,包含1到100,100個數,如果我們想儲存5個分割槽中,範圍分割槽首先計算各個範圍 (5: [1-20],[21-40], [41-60], [61-80], [81-100]),然後,資料會根據資料的Key值匹配範圍值,例如,key值為5的資料放在範圍[1-20]分割槽內,key值為59的資料放在範圍[41-60]分割槽內。

2)自定義分割槽器

implementations of abstract class org.apache.spark.Partitioner

3)重新分割槽

coalesce(numPartitions) - 減少分割槽數,該方法在很多分割槽資料較少的情況下可以使用,通過減少分割槽提高執行效率. 如果允許shuffle,不僅可以減少分割槽,也可以增加,同時shuffle會因為資料分割槽導致資料通過網路移動,有可能導致效能風險。 repartition(numPartitions)

repartitionAndSortWithinPartitions(partitioner)在shuffle的過程中重新指定分割槽器

分割槽的屬性

  1. 一個RDD有多個分割槽分散在不同節點上
  2. 一個分割槽的資料都來自於某一個RDD
  3. 一個分割槽的資料不可能來自於兩個不同的RDD
  4. 一個分割槽必定儲存於某一臺機器(節點)上,在一個節點上被處理
  5. 一臺機器(節點)可以儲存多個不同分割槽(每個CPU分配2-4個分割槽)

確定分割槽數目

分割槽數目的多少會導致什麼問題?

分割槽數目代表了並行度,太多小的分割槽會導致任務排程的代價巨大,同時資源釋放導致垃圾回收的壓力巨大 ,太少的分割槽導致並行度降低,確定一個合理的分割槽數目,考慮以下幾點:

  • 一般分割槽數目在100到10000之間
  • 最小閾值 = 2*叢集中總核數
  • 最大閾值 = 任務在100ms內能處理完畢
  • 最後一個stage的RDD決定了分割槽的數目,但是coalesce 或者 repartition會改變這個數目。

分散式資料分割槽

從HDFS中讀檔案,一個分割槽對應一個hdfs檔案分片,因此Spark使用已有的機制對HDFS資料進行分割槽。 如果是將Driver端的Scala集合並行化建立RDD,沒有指定RDD的分割槽數目,分割槽數等於叢集中分配給該應用的總核數。 如果是從HDFS中讀取資料建立RDD,並且設定了最新分割槽數量是1,那麼RDD的分割槽資料即是輸入切片的資料,如果不設定最小分割槽的數量,spark呼叫textfile時預設傳入2,那麼RDD的分割槽數量大於等於切片的數量。

block和split的理解: 兩者是從不同的角度來定義的:HDFS以固定大小的block為基本單位儲存資料(分散式檔案系統,實際儲存角度,物理儲存單位),而MapReduce以split作為處理單位(程式設計模型角度,邏輯單位)。 對於檔案中的一行記錄,可能會劃分到不同的block中,也可能劃分到不同的split中。 split是邏輯上的概念,它只包含一些元資料資訊,比如資料起始位置、資料長度、資料所在節點等,它的劃分方法完全由使用者自己決定。split的多少決定Map Task的數目,因為每個split交給一個Map Task處理。

大小關係:>,=,<均有可能。