spark2.x由淺入深深到底系列六之RDD 支持java8 lambda表達式
阿新 • • 發佈:2017-09-21
spark lambda java8 老湯 rdd
學習spark任何技術之前,請正確理解spark,可以參考:正確理解spark
我們在 http://7639240.blog.51cto.com/7629240/1966131 中已經知道了,一個scala函數其實就是java中的一個接口,對於java8 lambda而言,也是一樣,一個lambda表達式就是java中的一個接口。接下來我們先看看spark中最簡單的wordcount這個例子,分別用java8的非lambda以及lambda來實現:
一、非lambda實現的java spark wordcount程序:
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDD<LongWritable, Text> inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD<String> inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD<String> wordsRDD = inputRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> keyValueWordsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> wordCountRDD = keyValueWordsRDD.reduceByKey(new HashPartitioner(2), new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); //如果輸出文件存在的話需要刪除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); } }
二、java8 lambda實現的wordcount代碼
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDD<LongWritable, Text> inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD<String> inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD<String> wordsRDD = inputRDD.flatMap(input -> Arrays.asList(input.split(" ")).iterator()); JavaPairRDD<String, Integer> keyValueWordsRDD = wordsRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1)); JavaPairRDD<String, Integer> wordCountRDD = keyValueWordsRDD.reduceByKey((a, b) -> a + b); //如果輸出文件存在的話需要刪除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); } }
從上面可以看出,lambda的實現更加簡潔,也可以看出一個lambda函數表達式就是一個java接口。
我們在http://7639240.blog.51cto.com/7629240/1966958提到的combineByKey,如下的代碼:
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //當在一個分區中遇到新的key的時候,對這個key對應的value應用這個函數 Function<Integer, Tuple2<Integer, Integer>> createCombiner = new Function<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Integer value) throws Exception { return new Tuple2<>(value, 1); } }; //當在一個分區中遇到已經應用過上面createCombiner函數的key的時候,對這個key對應的value應用這個函數 Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer value) throws Exception { return new Tuple2<>(acc._1() + value, acc._2() + 1); } }; //當需要對不同分區的數據進行聚合的時候應用這個函數 Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiners = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }; JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); //結果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
可以寫成如下的lambda實現的combineByKey:
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //當在一個分區中遇到新的key的時候,對這個key對應的value應用這個函數 Function<Integer, Tuple2<Integer, Integer>> createCombiner = value -> new Tuple2<>(value, 1); //當在一個分區中遇到已經應用過上面createCombiner函數的key的時候,對這個key對應的value應用這個函數 Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>> mergeValue = (acc, value) ->new Tuple2<>(acc._1() + value, acc._2() + 1); //當需要對不同分區的數據進行聚合的時候應用這個函數 Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiners = (acc1, acc2) -> new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); //結果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
如果想深入的系統的理解spark RDD api可以參考: spark core RDD api原理詳解
spark2.x由淺入深深到底系列六之RDD 支持java8 lambda表達式