spark常見運算元操作
阿新 • • 發佈:2018-12-23
package com.yzc.lilei.spark.transformoperate; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * 常見運算元操作 * @author lilei * @description * @Date 2018/11/22 14:13 * @Version 1.0 **/ public class TransformOperate { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("TransformOperate"); // .setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); map(jsc); System.out.println("========================================="); flatMap(jsc); System.out.println("========================================="); mapPartiions(jsc); System.out.println("========================================="); glom(jsc); System.out.println("========================================="); union(jsc); System.out.println("========================================="); mapPartitionsWithIndex(jsc); System.out.println("========================================="); sample(jsc); System.out.println("========================================="); intersection(jsc); System.out.println("========================================="); distinct(jsc); System.out.println("========================================="); groupByKey(jsc); System.out.println("========================================="); reduceByKey(jsc); System.out.println("========================================="); aggregateByKey(jsc); System.out.println("========================================="); sortByKey(jsc); System.out.println("========================================="); join(jsc); System.out.println("========================================="); cogroup(jsc); System.out.println("========================================="); cartesian(jsc); System.out.println("========================================="); coalesce(jsc); System.out.println("========================================="); repartition(jsc); jsc.close(); } /** * repartition運算元操作 可以增加或者減少分割槽數量 * @param jsc */ public static void repartition(JavaSparkContext jsc){ List<String> staffList = Arrays.asList("張三", "李四", "王二", "麻子", "趙六", "王五", "李大個", "王大妞", "小明", "小倩"); JavaRDD<String> staffRDD = jsc.parallelize(staffList, 3); JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> { List<String> list = new ArrayList<>(); while (iterator.hasNext()) { String staff = iterator.next(); list.add("repartition1::::分割槽[" + (index + 1) + "], " + staff); } return list.iterator(); }, true); rdd1.foreach(tuple -> System.out.println(tuple)); JavaRDD<String> rdd2 = rdd1.repartition(6); JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> { List<String> list = new ArrayList<>(); while (iterator.hasNext()) { String staff = iterator.next(); list.add("repartition2::::分割槽[" + (index + 1) + "], " + staff); } return list.iterator(); }, true); rdd3.foreach(tuple -> System.out.println(tuple)); } /** * coalesce運算元操作 用來減少分割槽,聚合資料 * @param jsc */ public static void coalesce(JavaSparkContext jsc){ List<String> staffList = Arrays.asList("張三", "李四", "王二", "麻子", "趙六", "王五", "李大個", "王大妞", "小明", "小倩"); JavaRDD<String> staffRDD = jsc.parallelize(staffList, 6); JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> { List<String> list = new ArrayList<>(); while (iterator.hasNext()) { String staff = iterator.next(); list.add("coalesce1::::分割槽[" + (index + 1) + "], " + staff); } return list.iterator(); }, true); rdd1.foreach(tuple -> System.out.println(tuple)); JavaRDD<String> rdd2 = rdd1.coalesce(3); JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> { List<String> list = new ArrayList<>(); while (iterator.hasNext()) { String staff = iterator.next(); list.add("coalesce2::::分割槽[" + (index + 1) + "], " + staff); } return list.iterator(); }, true); rdd3.foreach(tuple -> System.out.println(tuple)); } /** * cartesian 運算元操作 笛卡爾積操作 * @param jsc */ public static void cartesian(JavaSparkContext jsc){ List<String> clothes = Arrays.asList("夾克", "T恤", "皮衣", "風衣"); JavaRDD<String> clothesRDD = jsc.parallelize(clothes); List<String> trousers = Arrays.asList("皮褲", "運動褲", "牛仔褲", "休閒褲"); JavaRDD<String> trousersRDD = jsc.parallelize(trousers); JavaPairRDD<String, String> pairsRDD = clothesRDD.cartesian(trousersRDD); /** * 列印結果 */ pairsRDD.foreach(tuple -> System.out.println("cartesian:"+tuple)); } /** * cogroup 運算元操作 * @param jsc */ public static void cogroup(JavaSparkContext jsc){ List<String> list1 = new ArrayList<>(); List<String> list2 = new ArrayList<>(); list1.add("dog"); list1.add("dog"); list1.add("dog"); list1.add("dog"); list1.add("dog"); list1.add("dog1"); list1.add("dog1"); list1.add("dog1"); list1.add("dog1"); list1.add("dog2"); list1.add("dog2"); list1.add("dog2"); list2.add("dog3"); list2.add("dog3"); list2.add("dog3"); list1.add("salmon"); list2.add("salmon"); list1.add("salmon1"); list2.add("salmon1"); list2.add("rat"); list2.add("rat"); list1.add("elephant"); list2.add("elephant"); JavaRDD<String> rdd1 = jsc.parallelize(list1, 3); JavaRDD<String> rdd2 = jsc.parallelize(list2, 3); JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1)); JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1)); JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = pairRDD1.cogroup(pairRDD2); cogroupRDD.foreach(str -> System.out.println("cogroupRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]")); } /** * join 運算元操作 * @param jsc */ public static void join(JavaSparkContext jsc){ List<String> list1 = new ArrayList<>(); List<String> list2 = new ArrayList<>(); list1.add("dog"); list1.add("dog"); list1.add("dog"); list1.add("dog"); list1.add("dog"); list1.add("dog1"); list1.add("dog1"); list1.add("dog1"); list1.add("dog1"); list1.add("dog2"); list1.add("dog2"); list1.add("dog2"); list2.add("dog3"); list2.add("dog3"); list2.add("dog3"); list1.add("salmon"); list2.add("salmon"); list1.add("salmon1"); list2.add("salmon1"); list2.add("rat"); list2.add("rat"); list1.add("elephant"); list2.add("elephant"); JavaRDD<String> rdd1 = jsc.parallelize(list1, 3); JavaRDD<String> rdd2 = jsc.parallelize(list2, 3); JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1)); JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1)); JavaPairRDD<String, Tuple2<Integer, Integer>> joinRDD = pairRDD1.join(pairRDD2); joinRDD.foreach(str -> System.out.println("joinRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]")); } /** * sortByKey 運算元操作排序 * @param jsc */ public static void sortByKey(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog1"); list.add("dog1"); list.add("dog1"); list.add("tuple2"); list.add("tuple2"); list.add("tuple3"); list.add("tuple4"); list.add("tuple4"); list.add("tuple4"); JavaRDD<String> rdd = jsc.parallelize(list, 3); /** * mapToPair是將每一個字母都對映成鍵值對的形式 */ JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1)); JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2pairRDD.reduceByKey((v1, v2) -> v1 + v2); JavaPairRDD<Integer, String> reduceByKeyPairRDD = reduceByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); /** * sortByKey 有一個引數 預設是升序,填false是降序排列 */ JavaPairRDD<Integer, String> sortByKeyRDD = reduceByKeyPairRDD.sortByKey(false); /** * 列印一下 可以不用在意 */ sortByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2,tuple._1)) .foreach(tuple -> System.out.println("sortByKeyRDD:"+tuple._1+"="+tuple._2)); } /** * aggregateByKey 運算元操作 * @param jsc */ public static void aggregateByKey(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog1"); list.add("dog1"); list.add("dog1"); list.add("tuple2"); list.add("tuple2"); list.add("tuple3"); list.add("tuple4"); list.add("tuple4"); list.add("tuple4"); JavaRDD<String> rdd = jsc.parallelize(list, 3); JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1)); /** * 三個引數,第一個引數是初始值,第二個引數是分割槽本地合併計算函式, * 第三個引數會觸發shuffle操作 分割槽間進行合併計算函式 */ JavaPairRDD<String, Integer> aggregateByKeyRDD = rdd2pairRDD.aggregateByKey(0, (v1, v2) -> v1 + v2, (v1, v2) -> v1 + v2); aggregateByKeyRDD.foreach(tuple -> System.out.println("aggregateByKeyRDD:"+tuple._1+"="+tuple._2)); } /** * map運算元操作 */ public static void map(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog"); list.add("salmon"); list.add("salmon111"); list.add("rat"); list.add("elephant"); JavaRDD<String> mapRDD = jsc.parallelize(list, 3); mapRDD.zip(mapRDD.map(word -> word.length())).foreach(tuple -> System.out.println("map:"+tuple._1+"="+tuple._2)); } /** * flatMap 運算元操作 * @param jsc */ public static void flatMap(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("do g"); list.add("sal mon"); list.add("sal mon1"); list.add("ra t"); list.add("ele phant"); JavaRDD<String> flatMapRDD = jsc.parallelize(list, 3); JavaRDD<String> resultRDD = flatMapRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); /** * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交 */ resultRDD.foreach(str -> System.out.println("flatMap:"+str)); } /** * mapPartiions運算元操作 */ public static void mapPartiions(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog"); list.add("dog1"); list.add("dog2"); list.add("dog3"); list.add("salmon"); list.add("salmon1"); list.add("rat"); list.add("elephant"); JavaRDD<String> mapPartiionsRDD = jsc.parallelize(list, 3); JavaRDD<String> resultRDD = mapPartiionsRDD.mapPartitions(iterator -> { List<String> list1 = new ArrayList<>(); while (iterator.hasNext()){ String next = iterator.next(); if (next.length() > 5){ list1.add(next); } } return list1.iterator(); }); /** * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交 */ resultRDD.foreach(str -> System.out.println("mapPartiions:"+str)); } /** * mapPartitionsWithIndex 運算元操作 * @param jsc */ public static void mapPartitionsWithIndex(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog"); list.add("dog1"); list.add("dog2"); list.add("dog3"); list.add("salmon"); list.add("salmon1"); list.add("rat"); list.add("elephant"); JavaRDD<String> mapPartitionsWithIndexRDD = jsc.parallelize(list, 3); JavaRDD<String> resultRDD = mapPartitionsWithIndexRDD.mapPartitionsWithIndex((index, iterator) -> { List<String> result = new ArrayList<>(); while (iterator.hasNext()) { result.add(index + "::" + iterator.next()); } return result.iterator(); }, true); /** * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交 */ resultRDD.foreach(str -> System.out.println("mapPartitionsWithIndex:"+str)); } /** * sample 運算元操作 * @param jsc */ public static void sample(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog"); list.add("dog1"); list.add("dog2"); list.add("dog3"); list.add("salmon"); list.add("salmon1"); list.add("rat"); list.add("elephant"); JavaRDD<String> sampleRDD = jsc.parallelize(list, 3); //測試第一種,true的話 經過測試隨機出來的元素可能重複 // JavaRDD<String> resultRDD = sampleRDD.sample(true, 0.6); //測試第二種 經過測試隨機出來的元素不可能重複 // JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6); // 測試第三種 JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6,9); /** * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交 */ resultRDD.foreach(str -> System.out.println("sample:"+str)); } /** * glom 運算元操作 * @param jsc */ public static void glom(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog"); list.add("dog1"); list.add("dog2"); list.add("dog3"); list.add("salmon"); list.add("salmon1"); list.add("rat"); list.add("elephant"); JavaRDD<String> glom = jsc.parallelize(list, 3); glom.glom().foreach(list1 -> System.out.println("glom"+list1)); } /** * union 運算元操作 * @param jsc */ public static void union(JavaSparkContext jsc){ List<String> list1 = new ArrayList<>(); List<String> list2 = new ArrayList<>(); list1.add("dog"); list1.add("dog1"); list1.add("dog2"); list2.add("dog3"); list2.add("salmon"); list2.add("salmon1"); list2.add("rat"); list2.add("elephant"); JavaRDD<String> rdd1 = jsc.parallelize(list1, 3); JavaRDD<String> rdd2 = jsc.parallelize(list2, 3); JavaRDD<String> unionRDD = rdd1.union(rdd2); unionRDD.foreach(str -> System.out.println("union:"+str)); } /** * intersection 運算元操作 * @param jsc */ public static void intersection(JavaSparkContext jsc){ List<String> list1 = new ArrayList<>(); List<String> list2 = new ArrayList<>(); list1.add("dog"); list1.add("dog1"); list1.add("dog2"); list2.add("dog"); list2.add("salmon"); list2.add("salmon1"); list2.add("rat"); list2.add("elephant"); JavaRDD<String> rdd1 = jsc.parallelize(list1, 3); JavaRDD<String> rdd2 = jsc.parallelize(list2, 3); JavaRDD<String> intersectionRDD = rdd1.intersection(rdd2); intersectionRDD.foreach(str -> System.out.println("intersectionR:"+str)); } /** * distinct 運算元操作 * @param jsc */ public static void distinct(JavaSparkContext jsc){ List<String> list1 = new ArrayList<>(); list1.add("dog"); list1.add("dog"); list1.add("dog"); list1.add("dog1"); list1.add("dog1"); list1.add("dog1"); list1.add("dog2"); list1.add("dog2"); list1.add("dog2"); JavaRDD<String> rdd1 = jsc.parallelize(list1, 3); /** * distinct的運算元有一個引數numPartitions,可不填,填的話指定多少個分割槽 */ JavaRDD<String> distinctRDD = rdd1.distinct(); distinctRDD.foreach(str -> System.out.println("distinct:"+str)); } /** * groupByKey 運算元操作 * @param jsc */ public static void groupByKey(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog1"); list.add("dog1"); list.add("dog1"); list.add("tuple2"); list.add("tuple2"); list.add("tuple3"); list.add("tuple4"); list.add("tuple4"); list.add("tuple4"); JavaRDD<String> rdd = jsc.parallelize(list, 3); JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(str -> new Tuple2(str, 1)); /** * groupByKeyRDD,可不填,填的話指定多少個分割槽 */ JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey(); groupByKeyRDD.foreach(str -> System.out.println("groupByKeyRDD:"+str._1+":::"+str._2)); } public static void reduceByKey(JavaSparkContext jsc){ List<String> list = new ArrayList<>(); list.add("dog1"); list.add("dog1"); list.add("dog1"); list.add("tuple2"); list.add("tuple2"); list.add("tuple3"); list.add("tuple4"); list.add("tuple4"); list.add("tuple4"); JavaRDD<String> rdd = jsc.parallelize(list, 3); JavaPairRDD<String, Integer> reduceByKeyRDD = rdd.mapToPair(str -> new Tuple2(str, 1)); /** * reduceByKey,可不填,填的話指定多少個分割槽 */ JavaPairRDD<String, Integer> reduceByKeyPairRDD = reduceByKeyRDD.reduceByKey((v1, v2) -> v1 + v2); reduceByKeyPairRDD.foreach(str -> System.out.println("reduceByKeyPairRDD:"+str._1+":::"+str._2)); } }