1. 程式人生 > >spark常見運算元操作

spark常見運算元操作

package com.yzc.lilei.spark.transformoperate;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


/**
 * 常見運算元操作
 * @author lilei
 * @description
 * @Date 2018/11/22 14:13
 * @Version 1.0
 **/
public class TransformOperate {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("TransformOperate");
//                .setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        map(jsc);
        System.out.println("=========================================");
        flatMap(jsc);
        System.out.println("=========================================");
        mapPartiions(jsc);
        System.out.println("=========================================");
        glom(jsc);
        System.out.println("=========================================");
        union(jsc);
        System.out.println("=========================================");
        mapPartitionsWithIndex(jsc);
        System.out.println("=========================================");
        sample(jsc);
        System.out.println("=========================================");
        intersection(jsc);
        System.out.println("=========================================");
        distinct(jsc);
        System.out.println("=========================================");
        groupByKey(jsc);
        System.out.println("=========================================");
        reduceByKey(jsc);
        System.out.println("=========================================");
        aggregateByKey(jsc);
        System.out.println("=========================================");
        sortByKey(jsc);
        System.out.println("=========================================");
        join(jsc);
        System.out.println("=========================================");
        cogroup(jsc);
        System.out.println("=========================================");
        cartesian(jsc);
        System.out.println("=========================================");
        coalesce(jsc);
        System.out.println("=========================================");
        repartition(jsc);

        jsc.close();
    }

    /**
     * repartition運算元操作 可以增加或者減少分割槽數量
     * @param jsc
     */
    public static void repartition(JavaSparkContext jsc){
        List<String> staffList = Arrays.asList("張三", "李四", "王二", "麻子",
                "趙六", "王五", "李大個", "王大妞", "小明", "小倩");
        JavaRDD<String> staffRDD = jsc.parallelize(staffList, 3);
        JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("repartition1::::分割槽[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd1.foreach(tuple -> System.out.println(tuple));


        JavaRDD<String> rdd2 = rdd1.repartition(6);

        JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("repartition2::::分割槽[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd3.foreach(tuple -> System.out.println(tuple));
    }

    /**
     *  coalesce運算元操作 用來減少分割槽,聚合資料
     * @param jsc
     */
    public static void coalesce(JavaSparkContext jsc){
        List<String> staffList = Arrays.asList("張三", "李四", "王二", "麻子",
                "趙六", "王五", "李大個", "王大妞", "小明", "小倩");
        JavaRDD<String> staffRDD = jsc.parallelize(staffList, 6);

        JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("coalesce1::::分割槽[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd1.foreach(tuple -> System.out.println(tuple));


        JavaRDD<String> rdd2 = rdd1.coalesce(3);

        JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("coalesce2::::分割槽[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd3.foreach(tuple -> System.out.println(tuple));
    }

    /**
     * cartesian 運算元操作 笛卡爾積操作
     * @param jsc
     */
    public static void cartesian(JavaSparkContext jsc){
        List<String> clothes = Arrays.asList("夾克", "T恤", "皮衣", "風衣");
        JavaRDD<String> clothesRDD = jsc.parallelize(clothes);

        List<String> trousers = Arrays.asList("皮褲", "運動褲", "牛仔褲", "休閒褲");

        JavaRDD<String> trousersRDD = jsc.parallelize(trousers);

        JavaPairRDD<String, String> pairsRDD = clothesRDD.cartesian(trousersRDD);

        /**
         * 列印結果
         */
        pairsRDD.foreach(tuple -> System.out.println("cartesian:"+tuple));

    }

    /**
     * cogroup 運算元操作
     * @param jsc
     */
    public static void cogroup(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog2");
        list1.add("dog2");
        list1.add("dog2");
        list2.add("dog3");
        list2.add("dog3");
        list2.add("dog3");
        list1.add("salmon");
        list2.add("salmon");
        list1.add("salmon1");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("rat");
        list1.add("elephant");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1));
        JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1));

        JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = pairRDD1.cogroup(pairRDD2);

        cogroupRDD.foreach(str -> System.out.println("cogroupRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]"));
    }

    /**
     * join 運算元操作
     * @param jsc
     */
    public static void join(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog2");
        list1.add("dog2");
        list1.add("dog2");
        list2.add("dog3");
        list2.add("dog3");
        list2.add("dog3");
        list1.add("salmon");
        list2.add("salmon");
        list1.add("salmon1");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("rat");
        list1.add("elephant");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1));
        JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1));

        JavaPairRDD<String, Tuple2<Integer, Integer>> joinRDD = pairRDD1.join(pairRDD2);

        joinRDD.foreach(str -> System.out.println("joinRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]"));
    }
    /**
     * sortByKey 運算元操作排序
     * @param jsc
     */
    public static void sortByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);

        /**
         * mapToPair是將每一個字母都對映成鍵值對的形式
         */
        JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1));

        JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2pairRDD.reduceByKey((v1, v2) -> v1 + v2);

        JavaPairRDD<Integer, String> reduceByKeyPairRDD = reduceByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        /**
         * sortByKey 有一個引數 預設是升序,填false是降序排列
         */
        JavaPairRDD<Integer, String> sortByKeyRDD = reduceByKeyPairRDD.sortByKey(false);

        /**
         * 列印一下 可以不用在意
         */
        sortByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2,tuple._1))
                .foreach(tuple -> System.out.println("sortByKeyRDD:"+tuple._1+"="+tuple._2));
    }

    /**
     * aggregateByKey 運算元操作
     * @param jsc
     */
    public static void aggregateByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);
        JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1));
        /**
         * 三個引數,第一個引數是初始值,第二個引數是分割槽本地合併計算函式,
         *  第三個引數會觸發shuffle操作 分割槽間進行合併計算函式
         */
        JavaPairRDD<String, Integer> aggregateByKeyRDD = rdd2pairRDD.aggregateByKey(0, (v1, v2) -> v1 + v2, (v1, v2) -> v1 + v2);
        aggregateByKeyRDD.foreach(tuple -> System.out.println("aggregateByKeyRDD:"+tuple._1+"="+tuple._2));
    }

    /**
     * map運算元操作
     */
    public static void map(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("salmon");
        list.add("salmon111");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> mapRDD = jsc.parallelize(list, 3);
        mapRDD.zip(mapRDD.map(word -> word.length())).foreach(tuple -> System.out.println("map:"+tuple._1+"="+tuple._2));
    }

    /**
     * flatMap 運算元操作
     * @param jsc
     */
    public static void flatMap(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("do g");
        list.add("sal mon");
        list.add("sal mon1");
        list.add("ra t");
        list.add("ele phant");
        JavaRDD<String> flatMapRDD = jsc.parallelize(list, 3);
        JavaRDD<String> resultRDD = flatMapRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        /**
         * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交
         */
        resultRDD.foreach(str -> System.out.println("flatMap:"+str));
    }

    /**
     * mapPartiions運算元操作
     */
    public static void mapPartiions(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> mapPartiionsRDD = jsc.parallelize(list, 3);
        JavaRDD<String> resultRDD = mapPartiionsRDD.mapPartitions(iterator -> {
            List<String> list1 = new ArrayList<>();
            while (iterator.hasNext()){
                String next = iterator.next();
                if (next.length() > 5){
                    list1.add(next);
                }
            }
            return list1.iterator();
        });
        /**
         * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交
         */
        resultRDD.foreach(str -> System.out.println("mapPartiions:"+str));
    }

    /**
     * mapPartitionsWithIndex 運算元操作
     * @param jsc
     */
    public static void mapPartitionsWithIndex(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> mapPartitionsWithIndexRDD = jsc.parallelize(list, 3);
        JavaRDD<String> resultRDD = mapPartitionsWithIndexRDD.mapPartitionsWithIndex((index, iterator) -> {
            List<String> result = new ArrayList<>();
            while (iterator.hasNext()) {
                result.add(index + "::" + iterator.next());
            }
            return result.iterator();
        }, true);
        /**
         * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交
         */
        resultRDD.foreach(str -> System.out.println("mapPartitionsWithIndex:"+str));

    }

    /**
     * sample 運算元操作
     * @param jsc
     */
    public static void sample(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> sampleRDD = jsc.parallelize(list, 3);

        //測試第一種,true的話 經過測試隨機出來的元素可能重複
//        JavaRDD<String> resultRDD = sampleRDD.sample(true, 0.6);
        //測試第二種 經過測試隨機出來的元素不可能重複
//        JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6);
        // 測試第三種
        JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6,9);
        /**
         * 列印這個rdd裡面的元素,foreach是action操作,觸發任務提交
         */
        resultRDD.foreach(str -> System.out.println("sample:"+str));

    }



    /**
     * glom 運算元操作
     * @param jsc
     */
    public static void glom(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> glom = jsc.parallelize(list, 3);
        glom.glom().foreach(list1 -> System.out.println("glom"+list1));
    }

    /**
     * union 運算元操作
     * @param jsc
     */
    public static void union(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog2");
        list2.add("dog3");
        list2.add("salmon");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaRDD<String> unionRDD = rdd1.union(rdd2);

        unionRDD.foreach(str -> System.out.println("union:"+str));
    }

    /**
     * intersection 運算元操作
     * @param jsc
     */
    public static void intersection(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog2");
        list2.add("dog");
        list2.add("salmon");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaRDD<String> intersectionRDD = rdd1.intersection(rdd2);

        intersectionRDD.foreach(str -> System.out.println("intersectionR:"+str));
    }

    /**
     * distinct 運算元操作
     * @param jsc
     */
    public static void distinct(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog2");
        list1.add("dog2");
        list1.add("dog2");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);

        /**
         * distinct的運算元有一個引數numPartitions,可不填,填的話指定多少個分割槽
         */
        JavaRDD<String> distinctRDD = rdd1.distinct();

        distinctRDD.foreach(str -> System.out.println("distinct:"+str));
    }

    /**
     * groupByKey 運算元操作
     * @param jsc
     */
    public static void groupByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);

        JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(str -> new Tuple2(str, 1));


        /**
         * groupByKeyRDD,可不填,填的話指定多少個分割槽
         */
        JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();

        groupByKeyRDD.foreach(str -> System.out.println("groupByKeyRDD:"+str._1+":::"+str._2));
    }

    public static void reduceByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);

        JavaPairRDD<String, Integer> reduceByKeyRDD = rdd.mapToPair(str -> new Tuple2(str, 1));


        /**
         * reduceByKey,可不填,填的話指定多少個分割槽
         */
        JavaPairRDD<String, Integer>  reduceByKeyPairRDD = reduceByKeyRDD.reduceByKey((v1, v2) -> v1 + v2);

        reduceByKeyPairRDD.foreach(str -> System.out.println("reduceByKeyPairRDD:"+str._1+":::"+str._2));
    }
}