Spark運算元:transformation之map、flatMap和distinct
阿新 • • 發佈:2018-12-10
1、map
將RDD中的每個元素通過map中的函式對映為一個新的元素,並返回一個新型別的RDD。輸入時的分割槽數與輸出時的分割槽數保持一致。
//HDFS上的txt檔案 hadoop fs -cat /tmp/1.txt hello world hello spark hello hive //讀取HDFS檔案到RDD scala> var data = sc.textFile("/tmp/1.txt") data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21 //使用map運算元 scala> var mapresult = data.map(line => line.split("\\s+")) mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23 其中Array[String]=Array(hello, world), Array(hello, spark), Array(hello, hive) //運算map運算元結果 scala> mapresult.collect res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
2、flatMap
(1)分兩步:第一步map,第二步flat即將所有的輸出分割槽結果合併到一個分割槽。
/使用flatMap運算元 scala> var flatmapresult = data.flatMap(line => line.split("\\s+")) flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23 //運算flagMap運算元結果 scala> flatmapresult.collect res1: Array[String] = Array(hello, world, hello, spark, hello, hive)
(2)flatMap會將字串當做一個數組
scala> data.map(_.toUpperCase).collect res32: Array[String] = Array(HELLO WORLD, HELLO SPARK, HELLO HIVE, HI SPARK) scala> data.flatMap(_.toUpperCase).collect res33: Array[Char] = Array(H, E, L, L, O, , W, O, R, L, D, H, E, L, L, O, , S, P, A, R, K, H, E, L, L, O, , H, I, V, E, H, I, , S, P, A, R, K)
(3)map與flatMap的處理結果對比
scala> data.map(x => x.split("\\s+")).collect
res34: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive), Array(hi, spark))
scala> data.flatMap(x => x.split("\\s+")).collect
res35: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
flatMap只會將String扁平化成字元陣列,並不會把Array[String]也扁平化成字元陣列
3、distinct
對RDD中的元素去重。
scala> data.flatMap(line => line.split("\\s+")).collect
res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
scala> data.flatMap(line => line.split("\\s+")).distinct.collect
res62: Array[String] = Array(hive, hello, world, spark, hi)