Spark一些常用的資料處理方法-1.RDD計算
在Spark實際應用中,會用到很多數值處理方法,我將一些比較常用的方法寫在這裡,供新手向的學習參考一下。
1.1 讀取檔案至RDD
var rdd = sc.textFile("檔案路徑")
var rddfromhdfs = sc.textFile("hdfs://hadoop-master-001:9000/hdfs檔案路徑")
這個當中有很多方法,比較常用的是
//返回一個K,V形式的RDD
sequenceFile[K,V] ()
1.2 快取到記憶體
快取是可以隨時用的,但是過多的快取非常消耗記憶體資源,所以用的時候需要合理設計
//常用的,持久化
cache()
//不常用的,返回一個List,太佔資源,拿來學習用還可以
collect()
//切斷“血統”,避免過度佔用資源
checkpoint()
1.3 基礎轉換
沒有什麼是用基礎轉換做不了的,如果有那就多用幾次。
//把RDD中的元素從一種型別或狀態變成另一種,元素個數保持不變
map[U] (f:(T)=>U):RDD[U]
//去重,返回一個沒有重複元素的RDD
distinct():RDD[T]
//合併成一個系列的元素,常用於多個Array或者Seq合併成一個Array或者Seq
flapMap[U] (f:(T)=>TraversableOnce[U]):RDD[U]
//同類型RDD合併,並集,元素不去重
union(other:RDD[T]):RDD[T]
//同類型RDD比較,交集,元素去重
intersection(other:RDD[T]):RDD[T]
//同類型RDD比較,非otherRDD元素集,元素不去重
subtract(other:RDD[T]):RDD[T]
//同類型RDD合併,變成K,V形式RDD,要求元素數目相同
zip[U] (other:RDD[U]):RDD[(T,U)]
//將元素和索引號組合成鍵值對
zipWithIndex():RDD[(T,Long)]
//值型別轉換
//是否是某一型別
isInstanceOf[T]():Boolean
//轉換為某一型別
asInstanceOf[T]():T
//過濾
filter (f:(T)=>T):RDD[T]
1.4 鍵值轉換
鍵值轉換通常針對的是[K,V]組合的RDD進行的操作,在聚合或者處理JSON資料經常會用的到
//修改value
mapValues[U] (f: (V)=>U): RDD[(K,U)]
//將value拆分成多個,並使用同一個K
flatMapValue[U] (f:(V)=>TraversableOnce[U]):RDD[(K,U)]
//摺疊處理,一般用於相同K值合併,一般採用累和(0)(_ +_)或者累積(1)( _* _)之類
foldByKey(zreoValue:V)(func:(V,V)=>V):RDD[(K,V)]
//聚合,將相同的K合為一個渠道處理,返回RDD
groupByKey():RDD[(K,Interable[V])]
//聚合,以函式形式處理相同K下的value,返回RDD,是一種簡化的聚合
reduceByKey(func:(V,V)=>V):RDD[(K,V)]
//聚合,與上一個相同,返回值為Map
reduceByKeyLocally(func:(V,V)=>V):Map[K,V]
//全外關聯,關聯不上為空,最多支援三個RDD作為引數同時關聯,是底層用法,實際場景用join居多
cogroup[W]( other:RDD[(K,W)]):RDD[(K,(Iterable[V], Iterable[W])]
//內連線
join[W]( other:RDD[(K,W)]):RDD[(K,(V, W)]
//全連線
fullOuterJoin[W]( other:RDD[(K,W)]):RDD[(K,(Option[V], Option[W])]
//左連線
leftOuterJoin[W]( other:RDD[(K,W)]):RDD[(K,(Option[V], W)]
//右連線
rightOuterJoin[W]( other:RDD[(K,W)]):RDD[(K,(V, Option[W]))]
//刪除與otherRDD中相同Key的元素
subtractByKey( other:RDD[(K,W)]):RDD[(K,V)]
1.5 行動操作
//第一個元素,非排序的
first()
//元素總數
count()
//元素二元計算 如+ - × ÷
reduce(f: (T,T)=>T)
//按下標獲取元素,不排序
take(num:Int):Array[T]
//按降序返回前num個元素
top(num:Int):Array[T]
//按升序返回前num個元素
takeOrdered(num:Int):Array[T]
//聚合,給一個初始值,而後累計操作
fold(zeroValue:T)(op:(T,T)=>T)
//遍歷
foreach(f:(T)=>Unit):Unit
//排序,true為升序,false為降序
sordBy[K]( f:(T)=>K, ascending:Bollean=true):RDD[T]
//[K,V]操作,
//返回某一K下的所有V
lookup(key:K):Seq[V]
//統計K的數量
countByKey:Map[K,Long]
1.6 儲存
//儲存為文字檔案
saveAsTextFile(path:String):Unit
//儲存為SequenceFile檔案儲存在HDFS上,底層預設呼叫
saveAsSequenceFile(path:String):Unit
//儲存為序列化的物件
saveAsObjectFile(path:String):Unit
//儲存到新版Hadoop的HDFS上
saveAsHadoopFile(path:String):Unit