【Spark篇】---Spark中Transformations轉換算子
阿新 • • 發佈:2018-02-01
pack gpo rds color boolean long als sam park
一、前述
Spark中默認有兩大類算子,Transformation(轉換算子),懶執行。action算子,立即執行,有一個action算子 ,就有一個job。
通俗些來說由RDD變成RDD就是Transformation算子,由RDD轉換成其他的格式就是Action算子。
二、常用Transformation算子
假設數據集為此:
1、filter
過濾符合條件的記錄數,true保留,false過濾掉。
package com.spark.spark.transformations; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction; /** * filter * 過濾符合符合條件的記錄數,true的保留,false的過濾掉。 * */ public class Operator_filter { public static void main(String[] args) { /** * SparkConf對象中主要設置Spark運行的環境參數。 * 1.運行模式 * 2.設置Application name * 3.運行的資源需求*/ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("filter"); /** * JavaSparkContext對象是spark運行的上下文,是通往集群的唯一通道。 */ JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./words.txt"); JavaRDD<String> resultRDD = lines.filter(new Function<String, Boolean>() { /** * */ private static final long serialVersionUID = 1L; @Override public Boolean call(String line) throws Exception { return !line.contains("hadoop");//這裏是不等於 } }); resultRDD.foreach(new VoidFunction<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(String line) throws Exception { System.out.println(line); } }); jsc.stop(); } }
函數解釋:
進來一個String,出去一個Booean.
結果:
2、map
將一個RDD中的每個數據項,通過map中的函數映射變為一個新的元素。
特點:輸入一條,輸出一條數據。
/** * map * 通過傳入的函數處理每個元素,返回新的數據集。 * 特點:輸入一條,輸出一條。 * * * @author root * */ public class Operator_map { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("map"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> line = jsc.textFile("./words.txt"); JavaRDD<String> mapResult = line.map(new Function<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public String call(String s) throws Exception { return s+"~"; } }); mapResult.foreach(new VoidFunction<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(String t) throws Exception { System.out.println(t); } }); jsc.stop(); } }
函數解釋:
進來一個String,出去一個String。
函數結果:
3、flatMap(壓扁輸出,輸入一條,輸出零到多條)
先map後flat。與map類似,每個輸入項可以映射為0到多個輸出項。
package com.spark.spark.transformations; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; /** * flatMap * 輸入一條數據,輸出0到多條數據。 * @author root * */ public class Operator_flatMap { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("flatMap"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./words.txt"); JavaRDD<String> flatMapResult = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); flatMapResult.foreach(new VoidFunction<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(String t) throws Exception { System.out.println(t); } }); jsc.stop(); } }
函數解釋:
進來一個String,出去一個集合。
Iterater 集合 iterator 遍歷元素函數結果:
4、sample(隨機抽樣)
隨機抽樣算子,根據傳進去的小數按比例進行又放回或者無放回的抽樣。(True,fraction,long)
True 抽樣放回
Fraction 一個比例 float 大致 數據越大 越準確
第三個參數:隨機種子,抽到的樣本一樣 方便測試
package com.spark.spark.transformations; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class Operator_sample { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("sample"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./words.txt"); JavaPairRDD<String, Integer> flatMapToPair = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<Tuple2<String, Integer>> call(String t) throws Exception { List<Tuple2<String,Integer>> tupleList = new ArrayList<Tuple2<String,Integer>>(); tupleList.add(new Tuple2<String,Integer>(t,1)); return tupleList; } }); JavaPairRDD<String, Integer> sampleResult = flatMapToPair.sample(true,0.3,4);//樣本有7個所以大致抽樣為1-2個 sampleResult.foreach(new VoidFunction<Tuple2<String,Integer>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t); } }); jsc.stop(); } }
函數結果:
5.reduceByKey
將相同的Key根據相應的邏輯進行處理。
package com.spark.spark.transformations; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class Operator_reduceByKey { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("reduceByKey"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./words.txt"); JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String,Integer>(t,1); } }); JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer,Integer,Integer>(){ /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } },10); reduceByKey.foreach(new VoidFunction<Tuple2<String,Integer>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t); } }); jsc.stop(); } }
函數解釋:
函數結果:
【Spark篇】---Spark中Transformations轉換算子