1. 程式人生 > >Spark的Action執行、 Transformation轉換、Controller控制三種操作型別的使用

Spark的Action執行、 Transformation轉換、Controller控制三種操作型別的使用

通過WordCount詳解過程圖,觀察Spark的執行過程,思考RDD操作型別有幾種?有了初步的思考,下文會給出RDD操作型別區別和實戰演示

有如下三種:

(1)transformation:進行資料狀態的轉換,對已有的RDD建立新的RDD。

(2)Action:觸發具體的作業,對RDD最後取結果的一種操作

(3)Controller:對效能效率和容錯方面的支援。persist , cache, checkpoint

1. Transformation:進行資料狀態的轉換,對已有的RDD建立新的RDD。

例如:
(1)map(func) :返回一個新的分散式資料集,由每個原元素經過func函式轉換後組成
(2)filter(func) : 返回一個新的資料集,由經過func函式後返回值為true的原元素組成 
(3)flatMap(func) : 類似於map,但是每一個輸入元素,會被對映為0到多個輸出元素(因此,func函式的返回值是一個Seq,而不是單一元素)
(4)sample(withReplacement, frac, seed) : 根據給定的隨機種子seed,隨機抽樣出數量為frac的資料
(5)union(otherDataset) : 返回一個新的資料集,由原資料集和引數聯合而成
(6)groupByKey([numTasks]) : 在一個由(K,V)對組成的資料集上呼叫,返回一個(K,Seq[V])對的資料集。注意:預設情況下,使用8個並行任務進行分組,你可以傳入numTask可選引數,根據資料量設定不同數目的Task
(7)reduceByKey(func, [numTasks]) : 在一個(K,V)對的資料集上使用,返回一個(K,V)對的資料集,key相同的值,都被使用指定的reduce函式聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選引數來配置的。
(8)join(otherDataset, [numTasks]) : 在型別為(K,V)和(K,W)型別的資料集上呼叫,返回一個(K,(V,W))對,每個key中的所有元素都在一起的資料集
(9)groupWith(otherDataset, [numTasks]) : 在型別為(K,V)和(K,W)型別的資料集上呼叫,返回一個數據集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup
(10)cartesian(otherDataset) : 笛卡爾積。但在資料集T和U上呼叫時,返回一個(T,U)對的資料集,所有元素互動進行笛卡爾積。

2. Action:觸發具體的作業,對RDD最後取結果的一種操作。

例如:
(1)reduce(func):通過函式func先聚集各分割槽的資料集,再聚集分割槽之間的資料,func接收兩個引數,返回一個新值,新值再做為引數繼續傳遞給函式func,直到最後一個元素
(2)collect():以資料的形式返回資料集中的所有元素給Driver程式,為防止Driver程式記憶體溢位,一般要控制返回的資料集大小
(3)count():返回資料集元素個數
(4)first():返回資料集的第一個元素
(5)take(n):以陣列的形式返回資料集上的前n個元素
(6)top(n):按預設或者指定的排序規則返回前n個元素,預設按降序輸出 
(7)takeOrdered(n,[ordering]): 按自然順序或者指定的排序規則返回前n個元素

(8)saveAsTextFile()

Spark action級別的操作,特點是內部都呼叫runJob方法,常用的action:reduce(1+2+3+4—-》1+2=3+3=6+4=10),aggregate,collection(把所有的結果放在driver上),count執行結果有多少條,take獲取執行結果的某幾條,saveAsTextFile把執行結果儲存在Hadoop支援的檔案系統上,foreach迴圈每一條元素;countByKey:可看一個key有多少個value;

注意:reduceByKey不是action,它是一個transformation

action級別操作舉例:

val numbers = sc.parallelize(1 to 100)
scala> numbers.reduce(_+_) 觸發作業
val result = numbers.map(2*_)
scala> result.collect

原始碼: 
 /**
   *Return an array that contains all of the elements in this RDD.
   */
  defcollect(): Array[T] = withScope {
    valresults = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
   Array.concat(results: _*)

  }

Driver會收集每個task的執行結果,每個Executor上會有很多的task。(通過網路通訊傳輸收集)

如果想在命令終端上看到執行結果就必須collect

凡是Action的級別的操作都會觸發sc.runJob。言外之意:一個應用程式會有很多的job。

scala> result.count()
def count(): Long = sc.runJob(this,Utils.getIteratorSize _).sum
scala> val topN = numbers.take(5)

countByKey:可看一個key有多少value

scala> val scores =Array(Tuple2(1,100),Tuple2(2,89),Tuple2(2, 99), Tuple2(3, 88))
scores: Array[(Int, Int)] = Array((1,100),(2,89), (2,99), (3,88))
scala> val scoresRDD =sc.parallelize(scores)
scoresRDD: org.apache.spark.rdd.RDD[(Int,Int)] = ParallelCollectionRDD[1] at parallelize at <console>:29
 defcountByKey(): Map[K, Long] = self.withScope {
   self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap

  }

3. Controller:對效能效率和容錯方面的支援。persist , cache, checkpoint

Q2:假定input/*下面有10個小於128M的檔案,這個過程產生幾個Partition?
12個
Q2:並行度,Shuffle,Partition之間的關係?

並行度和Shuffle沒有任何關係,並行度是涉及到執行效率的,Shuffle由RDD的依賴關係決定,如上如果不指定並行度,並行度會傳遞過去的。設定多少個並行度就有多少個Partition。