1. 程式人生 > >sparkCore Api常用運算元使用

sparkCore Api常用運算元使用


 
package sparkjava;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * 熟悉api
 */
public class TestSparkApi {
    //  API介紹
  /*  1.1 transform
    l  map(func):對呼叫map的RDD資料集中的每個element都使用func,然後返回一個新的RDD,這個返回的資料集是分散式的資料集

    l  filter(func) : 對呼叫filter的RDD資料集中的每個元素都使用func,然後返回一個包含使func為true的元素構成的RDD

    l  flatMap(func):和map差不多,但是flatMap生成的是多個結果

    l  mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition

    l  mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index

    l  sample(withReplacement,faction,seed):抽樣

    l  union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合

    l  distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element

    l  groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函式接受的key-valuelist

    l  reduceByKey(func,[numTasks]):就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數

    l  sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean型別

1.2 action
    l  reduce(func):說白了就是聚集,但是傳入的函式是兩個引數輸入返回一個值,這個函式必須是滿足交換律和結合律的

    l  collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組

    l  count():返回的是dataset中的element的個數

    l  first():返回的是dataset中的第一個元素

    l  take(n):返回前n個elements

    l  takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed

    l  saveAsTextFile(path):把dataset寫到一個text file中,或者hdfs,或者hdfs支援的檔案系統中,spark把每條記錄都轉換為一行記錄,然後寫到file中

    l  saveAsSequenceFile(path):只能用在key-value對上,然後生成SequenceFile寫到本地或者hadoop檔案系統

    l  countByKey():返回的是key對應的個數的一個map,作用於一個RDD

    l  foreach(func):對dataset中的每個元素都使用func*/
   /*資料情況
 a 1
b 2
c 3
d 4
e 5*/
    public static void main(String[] args) {
//        String filepath=args[0];
        SparkConf conf = new SparkConf();
        conf.setAppName("transformAndAction");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> rdd = sc.textFile("D://sparkAction.txt");

//        --transform

//        testSparkCoreApiMap(rdd); //本案例實現,列印所有資料
//        testSparkCoreApiFilter(rdd);
//        testSparkCoreApiFlatMap(rdd); //本案例實現:列印所有的字元

//        testSparkCoreApiUnion(rdd);//合併兩個RDD
        // testSparkCoreApiDistinct(rdd);
//         testSparkCoreApiMaptoPair(rdd);//把RDD對映為鍵值對型別的資料
//        testSparkCoreApiGroupByKey(rdd);//對鍵值對型別的資料進行按鍵值合併
//        testSparkCoreApiReduceByKey(rdd);//對鍵值對進行按鍵相同的對值進行操作
//                --action
        testSparkCoreApiReduce(rdd);//對RDD進行遞迴呼叫
    }

    /**
     * Map主要是對資料進行處理,不進行資料集的增減
     * <p>
     * 本案例實現,列印所有資料
     *
     * @param rdd
     */

    private static void testSparkCoreApiMap(JavaRDD<String> rdd) {
        JavaRDD<String> logData1 = rdd.map(new Function<String, String>() {
            public String call(String s) {
                return s;
            }
        });
        List list = logData1.collect();

        list.forEach(x->System.out.println(x));
        /*for (int i = 0; i < list.size(); i++) {
            System.out.println(list.get(i));


        }*/
    }
    /*
     * filter主要是過濾資料的功能
     * 本案例實現:過濾含有a的那行資料
     */
    private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
        JavaRDD<String> logData1 = rdd.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return (s.split(" "))[0].equals("a");
            }
        });

//        JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
//            public Boolean call(String s){
//
//                return (s.split(" "))[0].equals("a");
//            }
//
//        });
        List list = logData1.collect();

        list.forEach(System.out::println);
//        for (int i = 0; i < list.size(); i++) {
//            System.out.println(list.get(i));
//
//
//        }

    }

    /*
     * flatMap  使用者行轉列
     * 本案例實現:列印所有的字元
     */
    private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
        JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
//        JavaRDD<String> words=rdd.flatMap(
//                new FlatMapFunction<String, String>() {
//                    public Iterable<String> call(String s) throws Exception {
//                        return Arrays.asList(s.split(" "));
//                    }
//                }
//        );
        List list = words.collect();
        list.forEach(x-> System.out.println(x));
//        for (int i = 0; i < list.size(); i++) {
//            System.out.println(list.get(i));
//        }

    }

    /**
     * testSparkCoreApiUnion
     * 合併兩個RDD
     * @param rdd
     */
    private static void testSparkCoreApiUnion(JavaRDD<String> rdd){
        JavaRDD<String> unionRdd=rdd.union(rdd);
//        unionRdd.foreach(new VoidFunction<String>(){
//            public void call(String lines){
//                System.out.println(lines);
//            }
//        });

        unionRdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String lines) throws Exception {
                System.out.println(lines);
            }
        });
    }


    /**
     * testSparkCoreApiDistinct Test
     * 對RDD去重
     * @param rdd
     */
    private static void testSparkCoreApiDistinct(JavaRDD<String> rdd){
        JavaRDD<String> unionRdd=rdd.union(rdd).distinct();
        unionRdd.foreach(new VoidFunction<String>(){
            public void call(String lines){
                System.out.println(lines);
            }
        });
    }

    /**
     * testSparkCoreApiMaptoPair Test
     * 把RDD對映為鍵值對型別的資料
     * @param rdd
     */
    private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){

        JavaPairRDD<String,Integer> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] st = s.split(" ");
                return new Tuple2(st[0],st[1]); //自動推導型別
            }
        });
//        JavaPairRDD<String, Integer> pairRDD=rdd.mapToPair(new PairFunction<String,String,Integer>(){
//            @Override
//            public Tuple2<String, Integer> call(String t) throws Exception {
//                String[] st=t.split(" ");
//                return new Tuple2(st[0], st[1]);
//            }
//
//        });
        pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>(){
            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._2()+"=="+t._1());

            }
        });
    }
    /**
     * testSparkCoreApiGroupByKey Test
     * 對鍵值對型別的資料進行按鍵值合併
     * @param rdd
     */

    private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd){
        JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                String[] st=t.split(" ");
                return new Tuple2(st[0], Integer.valueOf(st[1]));
            }

        });

        JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd).groupByKey().sortByKey();//合併後並按照大小排序
        pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
            @Override
            public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                Iterable<Integer> iter = t._2();
                for (Integer integer : iter) {
                    System.out.println(integer);
                }
            }
        });
    }

    /**
     * testSparkCoreApiReduceByKey
     * 對鍵值對進行按鍵相同的對值進行操作
     * @param rdd
     */
    private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){

        //轉換成鍵值對
        JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                String[] st=t.split(" ");
                return new Tuple2(st[0], Integer.valueOf(st[1]));
            }

        });
        //相同鍵位 相加 並排序
        JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
                new Function2<Integer,Integer,Integer>(){
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1+v2;
                    }
                }
        ).sortByKey() ;

        //rdd輸出
        pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._2());

            }
        });
    }


    /**
     * testSparkCoreApiReduce
     * 對RDD進行遞迴呼叫
     * @param rdd
     */
    private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
        //由於原資料是String,需要轉為Integer才能進行reduce遞迴
        JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){
            @Override
            public Integer call(String v1) throws Exception {
                // TODO Auto-generated method stub
                return Integer.valueOf(v1.split(" ")[1]);
            }
        });

        Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
            @Override
            public Integer call(Integer v1,Integer v2) throws Exception {
                return v1+v2;
            }
        });
        System.out.println(a);

    }

}