1. 程式人生 > >Spark常用action運算元操作 —— Java版

Spark常用action運算元操作 —— Java版

  • collect運算元

package rddDemo.action;

import org.apache.spark.SparkConf ;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext ;
import org.apache.spark.api.java.function.Function;

import java.util.List ;
import java.util.ArrayList ;

/**
 * Created by asus on 2018/6/17.
 */
public class CollectDemoJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("CollectDemoJava") ;
        conf.setMaster("local[2]") ;

        System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;

        JavaSparkContext sc = new JavaSparkContext(conf) ;

        List<Integer> numbers = new ArrayList<>() ;
        for(int i = 0 ; i <= 10 ; i ++) {
            numbers.add(i) ;
        }

        JavaRDD<Integer> numRdd = sc.parallelize(numbers) ;

        JavaRDD<Integer> doubleNumRdd = numRdd.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer n) throws Exception {
                return n * 2 ;
            }
        }) ;

        for(Integer n : doubleNumRdd.collect()) {
            System.out.println("number : " + n);
        }

        sc.stop();
    }
}
  • reduce運算元

package rddDemo.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext ;
import org.apache.spark.api.java.JavaRDD ;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * reduce 運算元操作
 * action運算元
 * f(U , U) => U
 * Created by asus on 2018/7/7.
 */
public class ReduceDemoJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("ReduceDemoJava") ;
        conf.setMaster("local[2]") ;

        System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;

        JavaSparkContext sc = new JavaSparkContext(conf) ;

        List<Integer> numbers = new ArrayList<>() ;
        for(int i = 0 ; i <= 10 ; i ++) {
            numbers.add(i) ;
        }
        JavaRDD<Integer> numRDD = sc.parallelize(numbers , 3) ;
        numRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer num) throws Exception {
                System.out.println(num);
            }
        });

        // Integer, Integer, Integer 前兩個代表輸入引數的型別,最後一個代表函式返回值型別
        int sum = numRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }) ;
        System.out.println("sum -> " + sum);
        sc.stop() ;
    }
}
  • take運算元

package rddDemo.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List ;

/**
 * take(n)  獲取RDD中的前 n 個元素
 * Created by asus on 2018/7/8.
 */
public class TakeDemoJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("TakeDemoJava") ;
        conf.setMaster("local[2]") ;

        System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;

        JavaSparkContext sc =  new JavaSparkContext(conf) ;

        List<Integer> numbers = new ArrayList<>() ;
        for(int i = 0 ; i < 10 ; i ++) {
            numbers.add(i) ;
        }

        JavaRDD<Integer> numbersRDD = sc.parallelize(numbers , 3) ;
        List<Integer> take3Num = numbersRDD.take(3) ;

        for(int n : take3Num) {
            System.out.println(n);
        }

        sc.stop() ;
    }
}
  • takeSample運算元

package rddDemo.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by asus on 2018/7/8.
 */
public class TakeSampleDemoJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("TakeSampleDemoJava") ;
        conf.setMaster("local[2]") ;

        System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;

        JavaSparkContext sc = new JavaSparkContext(conf) ;

        List<String> names = new ArrayList<>() ;
        names.add("lao wang") ;
        names.add("xiao wang") ;
        names.add("lao zhang") ;
        names.add("xiao zhang") ;
        names.add("lao li") ;
        names.add("xiao li") ;

        JavaRDD<String> namesRDD = sc.parallelize(names , 3) ;

        // 無放回取樣,取 3 個元素
        System.out.println(">>>>>>>>>>>>>>>>>> 無放回取樣,取 3 個元素 <<<<<<<<<<<<<<<<<<");
        List<String> takeSample_1 = namesRDD.takeSample(false , 3) ;
        for(String name : takeSample_1) {
            System.out.println(name);
        }

        // 有放回取樣,取 3 個元素
        System.out.println(">>>>>>>>>>>>>>>>>> 有放回取樣,取 3 個元素 <<<<<<<<<<<<<<<<<<");
        List<String> takeSample_2 = namesRDD.takeSample(true , 3) ;
        for(String name : takeSample_2) {
            System.out.println(name);
        }

        // 無放回取樣,取 3 個元素,設定seed種子
        System.out.println(">>>>>>>>>>>>>>>>>> 無放回取樣,取 3 個元素,設定seed種子 <<<<<<<<<<<<<<<<<<");
        List<String> takeSample_3 = namesRDD.takeSample(false , 3 , 100) ;
        for(String name : takeSample_3) {
            System.out.println(name);
        }

        // 有放回取樣,取 3 個元素,設定seed種子
        System.out.println(">>>>>>>>>>>>>>>>>> 有放回取樣,取 3 個元素,設定seed種子 <<<<<<<<<<<<<<<<<<");
        List<String> takeSample_4 = namesRDD.takeSample(true , 3 , 100) ;
        for(String name : takeSample_4) {
            System.out.println(name);
        }

        sc.stop();
    }
}
  • reduceByKey運算元

package rddDemo.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * ReduceByKey 運算元
 * action 運算元
 * Created by asus on 2018/7/7.
 */
public class ReduceByKeyDemoJava {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("ReduceByKeyDemo") ;
        conf.setMaster("local[2]") ;

        System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;

        JavaSparkContext sc = new JavaSparkContext(conf) ;

        List<Tuple2<String , Integer>> scores = new ArrayList<>() ;
        scores.add(new Tuple2<>("lao wang" , 10)) ;
        scores.add(new Tuple2<>("lao wang" , 10)) ;
        scores.add(new Tuple2<>("xiao wang" , 50)) ;
        scores.add(new Tuple2<>("xiao wang" , 50)) ;
        scores.add(new Tuple2<>("lao zhang" , 50)) ;
        scores.add(new Tuple2<>("lao zhang" , 50)) ;
        scores.add(new Tuple2<>("xiaozhang zhang" , 90)) ;
        scores.add(new Tuple2<>("xiaozhang zhang" , 90)) ;

        JavaPairRDD<String , Integer> scoresRDD = sc.parallelizePairs(scores , 3) ;
        scoresRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> score) throws Exception {
                System.out.println("name : " + score._1 + " score : " + score._2);
            }
        });

        JavaPairRDD<String , Integer> totalScoreRDD = scoresRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }) ;

        totalScoreRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> totalScore) throws Exception {
                System.out.println("name : " + totalScore._1 + " score : " + totalScore._2);
            }
        });

        sc.stop();
    }
}
  • counByKey運算元

package rddDemo.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * 計算 RDD key 出現的次數
 * Created by asus on 2018/7/15.
 */
public class CountByKeyDemoJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("CountByKeyDemoJava") ;
        conf.setMaster("local[2]") ;

        System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;

        JavaSparkContext sc =  new JavaSparkContext(conf) ;
        List<Tuple2<String , Integer>> scores = new ArrayList<>() ;
        scores.add(new Tuple2<>("lao wang" , 10)) ;
        scores.add(new Tuple2<>("lao wang" , 20)) ;
        scores.add(new Tuple2<>("lao wang" , 30)) ;
        scores.add(new Tuple2<>("xiao wang" , 50)) ;
        scores.add(new Tuple2<>("lao zhang" , 60)) ;
        scores.add(new Tuple2<>("xiao zhang" , 70)) ;
        scores.add(new Tuple2<>("xiao zhang" , 80)) ;

        JavaPairRDD<String , Integer> scoreRDD = sc.parallelizePairs(scores , 2) ;
        Map<String , Long> keyCount = scoreRDD.countByKey() ;
        for(String s : keyCount.keySet()) {
            System.out.println("key -> " + s + " , " + " count -> " + keyCount.get(s).intValue());
        }

        sc.stop();
    }
}