Spark RDD-1-常用運算元
阿新 • • 發佈:2018-12-02
目錄
1、RDD簡介
Spark對資料的一種核心抽象,Resilient Distributed Dataset,彈性分散式資料集,不可變,是val型別
RDD資料儲存在記憶體中,採購伺服器時,需選擇記憶體較大的機器,計算能力強
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
儲存型:硬碟大
計算型:記憶體大
彈性:服務閒置較多時,減少服務;服務訪問壓力過大時,增加服務
分散式:叢集上多個節點,不一定某部分資料分給誰;
資料集:資料的容器,比如之前的map、list、set。
2、RDD建立
- 資料來源在程式內部,練習用,實際開發一般不會是這種方式,預設分割槽數2
val data = "Though my daily life is extremely monotonous" sc.parallelize(data.split(" "))
- 外部資料來源
sc.textFile("spark/src/test.txt")
- testFile:
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * * * Hadoop-supported file system URI, and return it as an RDD of Strings. * * @path: * 比如local file:"src/test.txt" * Hadoop-supported:local,"file://test.txt" * hdfs, "hdfs://namenode:test.txt" */ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
- path:本地檔案路徑,或者所有hadoop支援的檔案系統uri(寫法見程式碼中path舉例)
- minPatitions:指定RDD的分割槽數(RDD一個分割槽,就是一個Task),預設是
defaultMinPartitions: Int = math.min(defaultParallelism, 2) // defaultParallelism是配置檔案中指定的或程式中設定的引數
- 支援萬用字元*:比如textFile("/my/*.txt")、textFile("my/*")
- 支援讀取壓縮檔案:textFile("/my/*.gz")
- 支援指定多個檔案:textFile("/my/text1.txt","/my/text2.txt")
- testFile:
3、常用RDD運算元
(1)Action RDD
- foreach:遍歷每個元素,無返回值,一般用在將結果儲存到資料庫中使用
- saveAsTextFile儲存到hdfs,RDD每個partition存到hdfs的一個block塊
- saveAsObjectFile:儲存到hdfs,將每個partition的資料序列化後,以sequenceFile(序列化)格式存到hdfs
- collect:將RDD轉換為本地資料集合
- collectAsMap:PairRDD型別,可以轉換為本地的map型別
- count:RDD集合中資料量
- countByValue:各個元素在RDD中出現的次數,比如結果為{(2,1), (3,2)} ,2出現1次,3出現2次
- countByKey():Returns a hashmap of (K, Int) pairs with the count of each key
- lookup(k: Key):(PairRDD型別)選出與k匹配元素,將結果以sequence形式返回本地集
- top(num:Int) / take(num:Int):取出最大(最小)的num個元素
- 可以結合count做分頁
- 可以用來批量傳輸資料
- first:取第一個元素
- reduce(f: (T, T) => T): T
- 二元運算函式聚合rdd的元素,最後合成一個值
- 聚合,each partition先做聚合,再mergeResult將分割槽聚合結果再聚合
- fold(zeroValue: T)(op: (T, T) => T):設定了初始值的聚合,也是先對each partition內用op聚合,再將每個partition的計算結果用op再聚合
// 21,分割槽內聚合得11,再對所有分割槽結果再聚合得21 println(sc.parallelize(List(1), 1).fold(10)((x:Int,y:Int)=>x+y)))
// 33,兩個分割槽結果分別是11,12,之後再聚合10+11,再+12,33 println(sc.parallelize(List(1,2), 2).fold(10)((x:Int,y:Int)=>x+y))
- 類似reduce,區別是多了初始值,各個分割槽算一次zeroValue,最後聚合分割槽結果時再算一次
- aggregate:先歸併後聚合
- 類似fold:區別是聚合前後資料型別可以不一致,而reduce/fold聚合前後型別一致
- 計算:注意初始值會多次加入到結果中,分割槽數(M)+最後combine result次數=初始值參與M+1次
println(sc.parallelize(List(1, 2,3,4,5), 2).aggregate("s")( (str: String, num: Int) => { println(str + "\t\t\t" + num) s"${str}_${num}" }, (str1: String, str2: String) => { println(str1 + "\t" + str2) s"${str1}/${str2}" } )) 結果 s 1 s_1 2 s 3 s_3 4 s_3_4 5 s s_3_4_5 s/s_3_4_5 s_1_2 s/s_3_4_5/s_1_2 // s 初始值加入了3次
- seqOp:an operator used to accumulate results within a partition,初始值是zeroValue
- combOp:an associative operator used to combine results from different partitions,初始值是zeroValue
(2)單個RDD的 Transformation (惰性)
- filter:過濾結果集
- distinct:去重,返回新的RDD
- map:遍歷元素執行操作函式,返回值型別由操作函式決定,一個數據建立一個連線
- mapPartition:
- 和map類似,都是遍歷,區別是遍歷的是分割槽,函式是作用於每個分割槽而不是每個元素
- 適用於需要頻繁建立外部連結的情況
- 比如spark streaming消費kafka訊息之後需要操作資料庫時,需要建立資料庫連結,此時用mapPartition而非map,一個分割槽建立一個連結,以減少連線數
- mapPartitionWithIndex:和mapPartition一樣,只不過多提供了一個引數partition index,內部原始碼都是通過new MapPartitionsRDD
val source1 = sc.parallelize(List("apple", "apple", "pig", "apple", "pig"), 2) source1.mapPartitionsWithIndex{ (index, iter) => { var res = List[(String,Int)]() while(iter.hasNext) res = res.::(iter.next(), index) res.iterator } }.foreach(println) 結果: (apple,0) (apple,0) (pig,1) (apple,1) (pig,1)
- flatMap:壓平,將map後每個元素的子元素取出來(比如map處理後每個元素是個list,list裡每個元素提出來)
val sourceFlatMap = sc.parallelize(List(("apple,banana"), ("pig,cat,dog"))) println(sourceFlatMap.flatMap(_.split(",")).collect.mkString(" ")) 結果: apple banana pig cat dog
- 拉平元素
- 類似map,區別是可以把二維RDD轉成一維,如果map後不是集合,最終結果和map沒區別
- flatMapValues[U](f: V => TraversableOnce[U])
val source = sc.parallelize(List(("fruit", "apple,banana"), ("animal", "pig,cat,dog"))) println(source.flatMapValues(_.split(",")).collect.mkString(" * ")) 結果 (fruit,apple) * (fruit,banana) * (animal,pig) * (animal,cat) * (animal,dog)
- 拉平value
-
keyBy[K](f: T => K):將rdd轉成 key-value元組資料結構的pair rdd,根據value(T)得出k
val source1 = sc.parallelize(List("apple", "banana", "pig", "cat", "dog")) println(source1.keyBy(_.length).collect.mkString(" ")) 結果: (5,apple) (6,banana) (3,pig) (3,cat) (3,dog)
- groupBy[K](f: T => K, p: Partitioner):
def groupBy[K](f: T => K, p: Partitioner) (implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = withScope { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) }
- 函式f:用來指定key,將rdd中每個元素,轉成K
- 根據K,做groupByKey
- combineByKey(createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) }
- 下面的groupByKey、reduceByKey、aggregateByKey內部,都用了和combineByKey同樣的combineByKeyWithClassTag
- createCombiner:分割槽內,遍歷分割槽內元素,遇到一個新的key就用這個函式操作一次轉成C型別,不是新的key就不操作了(不是新的key操作的是下面的mergeValue)
- mergeValue:分割槽內的,對上面結果C,又遇到同樣的key,做mergeValue操作
- mergeCombiners:分割槽間函式,合併分割槽結果
- 比如求平均值
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) val d1 = sc.parallelize(initialScores) type MVType = (Int, Double) //定義一個元組型別(科目計數器,分數) d1.combineByKey( score => (1, score), (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) ).map { case (name, (num, socre)) => (name, socre / num) }.collect
res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))
- 下面的groupByKey、reduceByKey、aggregateByKey內部,都用了和combineByKey同樣的combineByKeyWithClassTag
- groupByKey():RDD[(K, Iterable[V])]:按key分組
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // 1、V型別轉成Buffer,不做計算 val createCombiner = (v: V) => CompactBuffer(v) // 2、merge V into Buffer,第一步計算,但其實沒算,只是把值放到buffer中 val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v // 3、combine two CompactBuffer[V] into one CompactBuffer[V] val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
- pair RDD的運算元
- 較耗時,如果是希望做聚合,using `PairRDDFunctions.aggregateByKey`or `PairRDDFunctions.reduceByKey效能更好:
- 原因是:groupByKey不能在map端做combine,需要把所有資料都insert into compactBuffer中,然後combine,這樣會造成 more objects in the old gen(jvm老生帶會有大量沒用的物件)
- 每次的結果,相同key對應的Iterable,元素順序可能不同
- reduceByKey:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { // 比較groupByKey,這裡func作用了兩個地方,一個是merge,一個是combine // merge:相當於是在本地map端先彙總 // combine:之後到reduce端再次彙總 combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
- 類似與groupByKey,區別是先在map端做一次聚合,再到reduce再聚合
- map端做一次彙總,減少資料IO傳輸,效能比groupBykey好
- aggregateByKey:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
- 類似與reduceByKey,也是分別在map和reduce端做聚合,區別是多了初始值的設定
- zeroValue:初始值,對每種key作用一次
- seqOp:在分割槽內,Aggregate the values of each key(聚合每個key的values),用zeroValue,結果型別是U
- comOp:a operation for merging two U's between partitions,在分割槽間執行
- sortByKey:
- 預設對Key做升序排序
- 如果是輸出到檔案,會寫入多個 part-X 檔案中
(3)多個RDD的Transformation
- union:兩個RDD並行處理
- 沒有shuffle,只是將兩個rdd合併,可能涉及資料的移動
-
intersection:交集且去重 (比如用在物品歸類)
-
cartesian:笛卡兒積
-
subtract(other: RDD[T]):作差(不去重)
-
Return an RDD with the elements from `this` that are not in `other`.
-
-
zip:兩個rdd一對一關聯處理元素
-
要求兩個rdd的元素數量要相同,有相同的值
-
比如表垂直拆分之後合併就用zip
-
-
join:pair RDD的操作,通過key進行關聯查詢,內部通過cogroup實現
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => // pair的兩個iterator都有值,才能到yield for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }
-
返回資料形式為: (k, (v1, v2)) tuple
-
-
leftOuterJoin:pair RDD的操作,左關聯查詢,內部通過cogroup實現
-
rightOuterJoin:pair RDD的操作,右,內部通過cogroup實現
-
fullOuterJoin:pair RDD的操作,全連線,內部通過cogroup實現,左邊沒有,以右邊為主,右邊沒有,以左邊為主