1. 程式人生 > >Spark-core運算元大全(java,scala,python)

Spark-core運算元大全(java,scala,python)

/**
 * Java版本導包相關
 */
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
  * Scala版本導包相關
  */
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

---------------------------寫在前面的東西--------------------------------------

public class TransformationOperator {
    /**
     * java前面的東西
     */
    public static SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
    public static JavaSparkContext sc = new JavaSparkContext(conf);
    public static void println(String str) {
        System.out.println(str);
    }
    //運算元
    //main()
object TransformationOperator {
  /**
    * Scala前面的東西
    */
  val conf: SparkConf = new SparkConf()
  conf.setMaster("local")
  conf.setAppName("TransformationOperator")
  val sc: SparkContext = new SparkContext(conf)
  val list: List[String] = List("張無忌","趙敏","周芷若")
  val rdd: RDD[String] = sc.parallelize(list)
  //運算元
  //main()

---------------------------Transformation運算元--------------------------------------

map()

/**
     * map()
     */
    public static void map() {
        final List<String> list = Arrays.asList("張無忌", "趙敏", "周芷若");
        //通過並行化的方式建立RDD
        final JavaRDD<String> rdd = sc.parallelize(list);

        final JavaRDD<String> nameRDD = rdd.map(new Function<String, String>() {
            @Override
            public String call(String name) throws Exception {
                return "Hello " + name;
            }
        });

        nameRDD.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                println(s);
            }
        });
    }
/**
    * name:張無忌
    * name:趙敏
    * name:周芷若
    */
  def map(): Unit ={
    rdd.map("name:"+_).foreach(println(_))
  }

flatMap()

public static void flatMap() {
        final List<String> list = Arrays.asList("張無忌 趙敏", "宋青書 周芷若");
        final JavaRDD<String> rdd = sc.parallelize(list);
        rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String names) throws Exception {
                return Arrays.asList(names.split(" ")).iterator();
            }
        }).map(new Function<String, String>() {
            @Override
            public String call(String name) throws Exception {
                return "Hello " + name;
            }
        }).foreach(new VoidFunction<String>() {
            @Override
            public void call(String line) throws Exception {
                println(line);
            }
        });
    }
/**
    * Hello:張無忌
    * Hello:趙敏
    * Hello:宋青書
    * Hello:周芷若
    */
  def flatMap()={
    val rdd1: RDD[String] = sc.parallelize(List("張無忌 趙敏", "宋青書 周芷若"))

    rdd1.flatMap(_.split(",")).flatMap(_.split(" ")).foreach(word=>println("Hello:"+word))
  }

filter()

/**
     * 從RDD過濾出來偶數
     */
    public static void filter() {
        final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
        final JavaRDD<Integer> rdd = sc.parallelize(list);
        final JavaRDD<Integer> filterRDD = rdd.filter(new Function<Integer, Boolean>() {
            //true 代表這個值我們要
            @Override
            public Boolean call(Integer number) throws Exception {
                return number % 2 == 0;
            }
        });
        filterRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                println(integer + "");
            }
        });
    }
/**
    * 2,4,6
    */
  def filter()={
    val list:List[Int] = List(1,2,3,4,5,6,7)
    val rdd1: RDD[Int] = sc.parallelize(list)
    rdd1.filter(_%2==0).foreach(println(_))
  }

groupByKey()

不會做區域性彙總

/**
     * RDD()
     * bykey
     * -結果:
     * 峨眉
     * 周芷若滅絕師太
     * 武當
     * 宋青書張三丰
     */
    public static void groupBykey() {
        final List<Tuple2<String, String>> list = Arrays.asList(
                new Tuple2<String, String>("峨眉", "周芷若"),
                new Tuple2<String, String>("武當", "宋青書"),
                new Tuple2<String, String>("峨眉", "滅絕師太"),
                new Tuple2<String, String>("武當", "張三丰")
        );

        final JavaPairRDD<String, String> rdd = sc.parallelizePairs(list);

        final JavaPairRDD<String, Iterable<String>> groupBykeyRDD = rdd.groupByKey();

        groupBykeyRDD.foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
            @Override
            public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
                final String menpai = tuple._1;
                final Iterator<String> iterator = tuple._2.iterator();
                println(menpai + " ");
                while (iterator.hasNext()) {
                    final String name = iterator.next();
                    System.out.print(name);
                }
                println("");
            }
        });
    }
/**
    * 後面是可以傳分割槽規則的,如果不寫預設是Hash分割槽
    * (峨眉,CompactBuffer(周芷若, 滅絕師太))
    * (武當,CompactBuffer(宋青書, 張三丰))
    */
  def groupBykey()= {
    val list: List[(String, String)] = List(("峨眉", "周芷若"), ("武當", "宋青書"), ("峨眉", "滅絕師太"), ("武當", "張三丰"))
    val rdd: RDD[(String, String)] = sc.parallelize(list)
    rdd.groupByKey().foreach(println(_))
  }

reduceByKey()

會進行區域性彙總,減少shuffle量,所以儘量使用該運算元,避免使用groupByKey()

/**
     *峨眉 100
     * 武當 129
     */
    public static void reduceBykey() {
        final List<Tuple2<String, Integer>> list = Arrays.asList(
                new Tuple2<String, Integer>("峨眉", 40),
                new Tuple2<String, Integer>("武當", 30),
                new Tuple2<String, Integer>("峨眉", 60),
                new Tuple2<String, Integer>("武當", 99)
        );
        //reduceBykey
        final JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(list);

        rdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }).foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                println(tuple._1 + " " + tuple._2);
            }
        });
    }
/**
    * (峨眉,100)
    * (武當,129)
    */
  def reduceBykey()={
    val list: List[(String, Int)] = List(("峨眉", 40), ("武當", 30), ("峨眉", 60), ("武當", 99))
    val rdd: RDD[(String, Int)] = sc.parallelize(list)
    rdd.reduceByKey(_+_).foreach(println(_))
  }

sortByKey()

/**
     * 98 -> 東方不敗
     * 85 -> 令狐沖
     * 83 -> 任我行
     * 80 -> 嶽不群
     */
    public static void sortBykey() {
        final List<Tuple2<Integer, String>> list = Arrays.asList(
                new Tuple2<Integer, String>(98, "東方不敗"),
                new Tuple2<Integer, String>(80, "嶽不群"),
                new Tuple2<Integer, String>(85, "令狐沖"),
                new Tuple2<Integer, String>(83, "任我行")
        );
        final JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(list);
        rdd.sortByKey(false)
                .foreach(new VoidFunction<Tuple2<Integer, String>>() {
                    @Override
                    public void call(Tuple2<Integer, String> tuple) throws Exception {
                        println(tuple._1 + " -> " + tuple._2);
                    }
                });
    }
/**
    * 98->東方不敗
    * 85->令狐沖
    * 83->任我行
    * 80->嶽不群
    */
  def sortBykey()= {
    val list: List[(Int, String)] = List((98, "東方不敗"), (80, "嶽不群"), (85, "令狐沖"), (83, "任我行"))
    val rdd: RDD[(Int, String)] = sc.parallelize(list)
    rdd.sortBy(word=>word,false,0).foreach(x=>println(x._1+"->"+x._2))
  }

join()

/**
     * 學號:1 名字:東方不敗 分數:99
     * 學號:3 名字:林平之 分數:97
     * 學號:2 名字:令狐沖 分數:98
     */
    public static void join() {
        final List<Tuple2<Integer, String>> names = Arrays.asList(
                new Tuple2<Integer, String>(1, "東方不敗"),
                new Tuple2<Integer, String>(2, "令狐沖"),
                new Tuple2<Integer, String>(3, "林平之")
        );
        final List<Tuple2<Integer, Integer>> scores = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 99),
                new Tuple2<Integer, Integer>(2, 98),
                new Tuple2<Integer, Integer>(3, 97)
        );

        final JavaPairRDD<Integer, String> nemesrdd = sc.parallelizePairs(names);
        final JavaPairRDD<Integer, Integer> scoresrdd = sc.parallelizePairs(scores);
        /**
         * <Integer, 學號
         * Tuple2<String, 名字
         * Integer>> 分數
         */
        final JavaPairRDD<Integer, Tuple2<String, Integer>> joinRDD = nemesrdd.join(scoresrdd);
//        final JavaPairRDD<Integer, Tuple2<Integer, String>> join = scoresrdd.join(nemesrdd);
        joinRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> tuple) throws Exception {
                println("學號:" + tuple._1 + " 名字:" + tuple._2._1 + " 分數:" + tuple._2._2);
            }
        });
    }
/**
    * (1,(東方不敗,99))
    * (3,(林平之,97))
    * (2,(令狐沖,98))
    * ----->
    * 學號:1 名字:東方不敗 分數:99
    * 學號:3 名字:林平之 分數:97
    * 學號:2 名字:令狐沖 分數:98
    */
  //可以把RDD1,RDD2中的相同的key給連線起來,類似於sql中的join操作
  def join()= {
    val list1: List[(Int, String)] = List((1, "東方不敗"), (2, "令狐沖"), (3, "林平之"))
    val list2: List[(Int, Int)]=List((1, 99), (2, 98), (3, 97))
    val rdd1: RDD[(Int, String)] = sc.parallelize(list1)
    val rdd2: RDD[(Int, Int)] = sc.parallelize(list2)
    rdd1.join(rdd2).foreach(x=>println("學號:"+x._1+" 名字:"+x._2._1+" 分數:"+x._2._2))
  }

union()

    /**
     * 1
     * 2
     * 3
     * 4
     * 3
     * 4
     * 5
     * 6
     */
    public static void union() {
        final List<Integer> list1 = Arrays.asList(1, 2, 3, 4);
        final List<Integer> list2 = Arrays.asList(3, 4, 5, 6);
        final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        final JavaRDD<Integer> rdd2 = sc.parallelize(list2);
        rdd1.union(rdd2)
                .foreach(new VoidFunction<Integer>() {
                    @Override
                    public void call(Integer number) throws Exception {
                        println(number + "");
                    }
                });
    }
  /**
    * union(或稱為聯合)的作用是將多個結果合併在一起顯示出來。
    * Union:將兩個RDD進行合併,不去重;
    * 1
    * 2
    * 3
    * 4
    * 3
    * 4
    * 5
    * 6
    */
  def union()={
    val list1: List[Int] = List(1,2,3,4)
    val list2: List[Int] = List(3,4,5,6)
    val rdd1: RDD[Int] = sc.parallelize(list1)
    val rdd2: RDD[Int] = sc.parallelize(list2)
    rdd1.union(rdd2).foreach(println(_))
  }

leftOuterJoin()

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/** 
  * (22,(qwe,None))
  * (3,(zxc,Some(true)))
  * (2,(asd,Some(true)))
  */
object Test {
  def main(args: Array[String]): Unit = {
    val a = List((1L, true), (2L, true), (3L, true))
    val b = List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Test")
    val sc: SparkContext = new SparkContext(conf)
    val rdd1: RDD[(Long, Boolean)] = sc.parallelize(a)
    val rdd2: RDD[(Long, String)] = sc.parallelize(b)
    //必須都是kv形式才能使用,常用於黑名單過濾
    val rdd3: RDD[(Long, (String, Option[Boolean]))] = rdd2.leftOuterJoin(rdd1)
    rdd3.foreach(println(_))
  }
}

intersection()

    /**
     * 交集
     * 4
     * 3
     */
    public static void intersection() {
        final List<Integer> list1 = Arrays.asList(1, 2, 3, 4);
        final List<Integer> list2 = Arrays.asList(3, 4, 5, 6);
        final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        final JavaRDD<Integer> rdd2 = sc.parallelize(list2);

        rdd1.intersection(rdd2)
                .foreach(new VoidFunction<Integer>() {
                    @Override
                    public void call(Integer number) throws Exception {
                        println(number + "");
                    }
                });
    }
  /**
    * 求交集
    * RDD1.intersection(RDD2) 返回兩個RDD的交集,並且去重
    * intersection 需要混洗資料,比較浪費效能
    * 4
    * 3
    */
  def intersection(){
    val list1: List[Int] = List(1,2,3,4)
    val list2: List[Int] = List(3,4,5,6)
    val rdd1: RDD[Int] = sc.parallelize(list1)
    val rdd2: RDD[Int] = sc.parallelize(list2)
    rdd1.intersection(rdd2).foreach(println(_))
  }

distinct()

    /**
     * 去重
     * 4
     * 1
     * 3
     * 2
     */
    public static void distinct() {
        final List<Integer> list1 = Arrays.asList(1, 2, 3, 3, 4, 4);
        final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        rdd1.distinct()
                .foreach(new VoidFunction<Integer>() {
                    @Override
                    public void call(Integer number) throws Exception {
                        println(number + " ");
                    }
                });
    }
  /**
    * distinct用於去重, 我們生成的RDD可能有重複的元素,使用distinct方法可以去掉重複的元素, 不過此方法涉及到混洗,操作開銷很大
    * 4
    * 1
    * 3
    * 2
    */
  def distinct()={
    val list: List[Int] = List(1,2,3,3,4,4)
    val rdd: RDD[Int] = sc.parallelize(list)
    rdd.distinct().foreach(println(_))
  }

cartesian()

    /**
     * 笛卡爾積
     * a->0
     * a->1
     * a->2
     * b->0
     * b->1
     * b->2
     */
    public static void cartesian() {

        final List<String> A = Arrays.asList("a", "b");
        final List<Integer> B = Arrays.asList(0, 1, 2);

        final JavaRDD<String> rddA = sc.parallelize(A);
        final JavaRDD<Integer> rddB = sc.parallelize(B);

        rddA.cartesian(rddB)
                .foreach(new VoidFunction<Tuple2<String, Integer>>() {
                    @Override
                    public void call(Tuple2<String, Integer> tuple) throws Exception {
                        println(tuple._1 + "->" + tuple._2);
                    }
                });
    }
  /**
    * 笛卡爾積
    * (a,0)       a->0
    * (a,1)       a->1
    * (a,2)  ->   a->2
    * (b,0)       b->1
    * (b,1)       b->2
    * (b,2)       b->3
    */
  def cartesian()={
    val list1: List[String] = List("a", "b")
    val list2: List[Int] = List(0, 1, 2)
    val rdd1: RDD[String] = sc.parallelize(list1)
    val rdd2: RDD[Int] = sc.parallelize(list2)
    rdd1.cartesian(rdd2).foreach(x=>println(x._1+"->"+x._2))
  }

mapPartitions()

    /**
     * map:
     * 一條資料一條資料的處理(檔案系統,資料庫等等)
     * mapPartitions:
     * 一次獲取的是一個分割槽的資料(hdfs)
     * 正常情況下,mapPartitions 是一個高效能的運算元
     * 因為每次處理的是一個分割槽的資料,減少了去獲取資料的次數。
     * 但是如果我們的分割槽如果設定得不合理,有可能導致每個分割槽裡面的資料量過大。
     * hello-1
     * hello-2
     * hello-3
     * hello-4
     * hello-5
     * hello-6
     */
    public static void mapPartitions() {
        final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
        //引數二代表這個rdd裡面有兩個分割槽
        final JavaRDD<Integer> rdd = sc.parallelize(list, 2);

        rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, String>() {
            //每次處理的是一個分割槽的資料
            @Override
            public Iterator<String> call(Iterator<Integer> iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (iterator.hasNext()) {
                    list.add("hello-" + iterator.next());
                }
                return list.iterator();
            }
        }).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                println(s);
            }
        });
    }
  /**
    * 一次獲取的是一個分割槽的資料(hdfs)
    * mapPartition可以倒過來理解,先partition,再把每個partition進行map函式,
    * 適用場景
    * 如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的過。
    * hello-1
    * hello-2
    * hello-3
    * hello-4
    * hello-5
    * hello-6
    */
  def mapPartitions()={
    val list: List[Int] = List(1,2,3,4,5,6)
    val rdd: RDD[Int] = sc.parallelize(list,2)
    rdd.mapPartitions(_.toIterator).foreach(x=>println("hello-"+x))
  }

repartition()

    /**
     * 進行重分割槽
     * HDFS -》 hello.txt   2個檔案塊(不包含副本)
     * 2個檔案塊 -》2個分割槽  -》當spark任務執行,一個分割槽就啟動一個task任務。
     * 解決的問題:本來分割槽數少  -》 增加分割槽數
     * 1,3,5,2,4,6
     */
    public static void repartition() {
        final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
        final JavaRDD<Integer> rdd = (JavaRDD<Integer>) sc.parallelize(list, 1);
        // coalesce(numPartitions, shuffle = true)
        rdd.repartition(2)
                .foreach(new VoidFunction<Integer>() {
                    @Override
                    public void call(Integer number) throws Exception {
                        println(number + "");
                    }
                });
    }
  /**
    * 進行重分割槽
    * 解決的問題:本來分割槽數少  -》 增加分割槽數
    * 1
    * 3
    * 5
    * 2
    * 4
    * 6
    */
  def repartition()={
    val list: List[Int] = List(1,2,3,4,5,6)
    val rdd: RDD[Int] = sc.parallelize(list,1)
    rdd.repartition(2).foreach(println(_))
  }

aggregateByKey()

    /**
     * 實現單詞計數
     * you  ->1
     * jump  ->2
     * i  ->1
     */
    public static void aggregateByKey() {
        final List<String> list = Arrays.asList("you,jump", "i,jump");
        final JavaRDD<String> rdd = sc.parallelize(list);
        rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(",")).iterator();
            }
        })
                //第一個引數輸入的資料型別,第二個引數輸出的Key的型別,第三個引數輸出的Value的型別
                .mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        })
              //第一個引數是初始值,如果是10就是每一個單詞一開始就是10個,0就是按照0往上加,第二個是區域性進行計算,第三個是全域性計算,這個特點就是控制的比較細,使用比較複雜,還可以對字串進行拼接
                .aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;//區域性
                    }
                }, new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;//全域性
                    }
                }
        ).foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                println(tuple._1 + "  ->" + tuple._2);
            }
        });
    }
  /**
    * 實現單詞計數
    * 第一個aggregateByKey函式我們可以自定義Partitioner。除了這個引數之外,其函式宣告和aggregate很類似;其他的aggregateByKey函式實現最終都是呼叫這個。
    * 第二個aggregateByKey函式可以設定分割槽的個數(numPartitions),最終用的是HashPartitioner。
    * 最後一個aggregateByKey實現先會判斷當前RDD是否定義了分割槽函式,如果定義了則用當前RDD的分割槽;如果當前RDD並未定義分割槽 ,則使用HashPartitioner。
    * aggregateByKey(初始值)((區域性計算),(全域性計算))
    * (you,1)
    * (jump,2)
    * (i,1)
    */
  def aggregateByKey() ={
    val list: List[String] = List("you,jump", "i,jump")
    val rdd: RDD[String] = sc.parallelize(list)
    rdd.flatMap(_.split(",")).map((_, 1)).aggregateByKey(0)(((x,y)=>x+y),((x,y)=>x+y)).foreach(println(_))

  }

coalesce()

    /**
     * 分割槽數由多  -》 變少
     */
    public static void coalesce() {
        final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
        final JavaRDD<Integer> rdd = (JavaRDD<Integer>) sc.parallelize(list, 3);
        rdd.coalesce(1)
                .foreach(new VoidFunction<Integer>() {
                    @Override
                    public void call(Integer integer) throws Exception {
                        println(integer + "");
                    }
                });
    }
  /**
    * 分割槽數由多  -》 變少
    * 1,2,3,4,5,6
    */
  def coalesce()={
    val list:List[Int] = List(1,2,3,4,5,6)
    val rdd: RDD[Int] = sc.parallelize(list,3)
    rdd.coalesce(1).foreach(println(_))
  }

mapPartitionsWithIndex()

    /**
     * map: 每次獲取和處理的就是一條資料
     * mapParitions: 每次獲取和處理的就是一個分割槽的資料
     * mapPartitionsWithIndex:每次獲取和處理的就是一個分割槽的資料,並且知道處理的分割槽的分割槽號.
     *
     * 0_1
     * 0_2
     * 0_3
     * 0_4
     * 1_5
     * 1_6
     * 1_7
     * 1_8
     */
    public static void mapPartitionsWithIndex() {
        final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        final JavaRDD<Integer> rdd = sc.parallelize(list, 2);//HashParitioners Rangepartitionw 自定義分割槽

        rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {
            @Override
            public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception {
                final ArrayList<String> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    list.add(index + "_" + iterator.next());
                }
                return list.iterator();
            }
        }, true)
                .foreach(new VoidFunction<String>() {
                    @Override
                    public void call(String s) throws Exception {
                        println(s);
                    }
                });
    }
  /**
    * map: 每次獲取和處理的就是一條資料
    * mapParitions: 每次獲取和處理的就是一個分割槽的資料
    * mapPartitionsWithIndex:每次獲取和處理的就是一個分割槽的資料,並且知道處理的分割槽的分割槽號.
    *
    * 0_1
    * 0_2
    * 1_3
    * 1_4
    * 1_5
    * 2_6
    * 2_7
    * 2_8
    */

  def mapPartitionsWithIndex()={
    val list:List[Int]  = List(1,2 ,3,4,5 ,6,7,8)
    val rdd: RDD[Int] = sc.parallelize(list, 3)
    rdd.mapPartitionsWithIndex(
      (x,iter) => {
        val array: ArrayBuffer[Any] = ArrayBuffer()
        var result = List[String]()
        //var i = 0
        while(iter.hasNext){
          array += iter.next()
        }
        array.map(i => x + "_" + i).iterator
      }
    ,true).foreach(println(_))
  }

cogroup()

    /**
     * When called on datasets of type (K, V) and (K, W),
     * returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples.
     *
     * ID:1 Name: [東方不敗, 東方不敗] Scores: [90, 98]
     * ID:3 Name: [嶽不群, 嶽不群] Scores: [89, 67]
     * ID:2 Name: [林平之, 林平之] Scores: [91, 78]
     *
     */
    public static void cogroup() {
        //sh s   sha  shan shang sa san sang
        final List<Tuple2<Integer, String>> list1 = Arrays.asList(
                new Tuple2<Integer, String>(1, "東方不敗"),
                new Tuple2<Integer, String>(2, "林平之"),
                new Tuple2<Integer, String>(3, "嶽不群"),
                new Tuple2<Integer, String>(1, "東方不敗"),
                new Tuple2<Integer, String>(2, "林平之"),
                new Tuple2<Integer, String>(3, "嶽不群")
        );

        final List<Tuple2<Integer, Integer>> list2 = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 90),
                new Tuple2<Integer, Integer>(2, 91),
                new Tuple2<Integer, Integer>(3, 89),
                new Tuple2<Integer, Integer>(1, 98),
                new Tuple2<Integer, Integer>(2, 78),
                new Tuple2<Integer, Integer>(3, 67)
        );

        final JavaPairRDD<Integer, String> rdd1 = sc.parallelizePairs(list1);
        final JavaPairRDD<Integer, Integer> rdd2 = sc.parallelizePairs(list2);

        final JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> rdd3 =
                (JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>) rdd1.cogroup(rdd2);
        rdd3.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> tuple) throws Exception {
                final Integer id = tuple._1;
                final Iterable<String> names = tuple._2._1;
                final Iterable<Integer> scores = tuple._2._2;
                println("ID:" + id + " Name: " + names + " Scores: " + scores);
            }
        });
    }
  /**
    * groupByKey是對單個 RDD 的資料進行分組,還可以使用一個叫作 cogroup() 的函式對多個共享同一個鍵的 RDD 進行分組
    * 例如
    * RDD1.cogroup(RDD2) 會將RDD1和RDD2按照相同的key進行分組,得到(key,RDD[key,Iterable[value1],Iterable[value2] ])的形式
    * cogroup也可以多個進行分組
    * 例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN])
    *
    * (1,(CompactBuffer(東方不敗, 東方不敗),CompactBuffer(90, 98)))
    * (3,(CompactBuffer(嶽不群, 嶽不群),CompactBuffer(89, 67)))
    * (2,(CompactBuffer(林平之, 林平之),CompactBuffer(91, 78)))
    *
    * ID:1Name:CompactBuffer(東方不敗, 東方不敗)Scores:CompactBuffer(90, 98)
    * ID:3Name:CompactBuffer(嶽不群, 嶽不群)Scores:CompactBuffer(89, 67)
    * ID:2Name:CompactBuffer(林平之, 林平之)Scores:CompactBuffer(91, 78)
    */
  def cogroup()= {
    val list1: List[(Int, String)] = List((1, "東方不敗"), (2, "林平之"), (3, "嶽不群"), (1, "東方不敗"), (2, "林平之"), (3, "嶽不群"))
    val list2: List[(Int, Int)] = List((1, 90), (2, 91), (3, 89), (1, 98), (2, 78), (3, 67))
    val rdd1: RDD[(Int, String)] = sc.parallelize(list1)
    val rdd2: RDD[(Int, Int)] = sc.parallelize(list2)
    rdd1.cogroup(rdd2).foreach(x=>println("ID:"+x._1+"Name:"+x._2._1+"Scores:"+x._2._2))
  }

repartitionAndSortWithinPartitions()

    /**
     * 少  -》 多
     */
    public static void repartitionAndSortWithinPartitions() {//調優
        final List<Integer> list = Arrays.asList(1, 2, 11, 3, 12, 4, 5);
        final JavaRDD<Integer> rdd = sc.parallelize(list, 1);
        final JavaPairRDD<Integer, Integer> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Integer number) throws Exception {
                return new Tuple2<>(number, number);
            }
        });
        //new HashPartitioner(2) new RangePartitioner<>()
        pairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
            @Override
            public int numPartitions() {
                return 2;
            }

            @Override
            public int getPartition(Object key) {
                final Integer number = Integer.valueOf(key.toString());
                if (number % 2 == 0) {
                    return 0;
                } else {
                    return 1;
                }
            }
        }).mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>,
                Iterator<String>>() {
            @Override
            public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, Integer>> iterator) throws Exception {
                final ArrayList<String> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    list.add(index + "_" + iterator.next());
                }
                return list.iterator();
            }
        }, false)
                .foreach(new VoidFunction<String>() {
                    @Override
                    public void call(String s) throws Exception {
                        println(s);
                    }
                });
    }
  /**
    * 少  -》 多
    * 如果需要在repartition重分割槽之後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions運算元。因為該運算元可以一邊進行重分割槽的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,效能可能是要高的。
    * 此方法需要K,V的資料
    * (1,3)
    * (1,2)
    * (1,4)
    * (2,3)
    * (2,4)
    * (5,4)
    */
  def repartitionAndSortWithinPartitions()= {
    val list = List((1,3),(1,2),(5,4),(1, 4),(2,3),(2,4))
    val rdd: RDD[(Int, Int)] = sc.parallelize(list, 1)
    rdd.repartitionAndSortWithinPartitions(new HashPartitioner(3)).foreach(println(_))
  }

sample()

    /**
     * 有放回
     * 無放回
     */
    public static void sample() {
        final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 10);
        final JavaRDD<Integer> rdd = sc.parallelize(list);
        /**
         * withReplacement: Boolean,
         *       true: 有放回的抽樣
         *       false: 無放回抽象
         * fraction: Double:
         *      RDD  裡面的每個元素被抽到的概率有多大
         * seed: Long:
         *      隨機種子
         */
        final JavaRDD<Integer> rdd2 = rdd.sample(false, 0.5);

        rdd2.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                println(integer + "");
            }
        });
    }
  /**
    * 結果不確定
    * 如果寫第三個引數(隨機種子)那麼結果是固定的
    */
  def sample()={
    val list: List[Int] = List (1,2,3,4,5,6,7,9,10)
    val rdd: RDD[Int] = sc.parallelize(list)
    rdd.sample(false,0.5).foreach(println(_))
  }

pipe()

    public static void pipe() {
        final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 10);
        final JavaRDD<Integer> rdd = sc.parallelize(list);

        //   final JavaRDD<String> pipe = rdd.pipe("sh wordcouont.sh");
    }
  /**
    * 呼叫Shell命令
    * 在Linux系統中,有許多對資料進行處理的shell命令,我們可能通過pipe變換將一些shell命令用於Spark中生成新的RDD。
    */
  def pipe()={
    val list: List[Int] = List(1,2,3,4,5,6,7,9,10)
    val rdd: RDD[Int] = sc.parallelize(list)
    rdd.pipe("sh wordcount.sh")
  }