Spark常用action運算元操作 —— Java版
阿新 • • 發佈:2019-02-04
-
collect運算元
package rddDemo.action; import org.apache.spark.SparkConf ; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext ; import org.apache.spark.api.java.function.Function; import java.util.List ; import java.util.ArrayList ; /** * Created by asus on 2018/6/17. */ public class CollectDemoJava { public static void main(String[] args) { SparkConf conf = new SparkConf() ; conf.setAppName("CollectDemoJava") ; conf.setMaster("local[2]") ; System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ; JavaSparkContext sc = new JavaSparkContext(conf) ; List<Integer> numbers = new ArrayList<>() ; for(int i = 0 ; i <= 10 ; i ++) { numbers.add(i) ; } JavaRDD<Integer> numRdd = sc.parallelize(numbers) ; JavaRDD<Integer> doubleNumRdd = numRdd.map(new Function<Integer, Integer>() { @Override public Integer call(Integer n) throws Exception { return n * 2 ; } }) ; for(Integer n : doubleNumRdd.collect()) { System.out.println("number : " + n); } sc.stop(); } }
-
reduce運算元
package rddDemo.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext ; import org.apache.spark.api.java.JavaRDD ; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import java.util.ArrayList; import java.util.List; /** * reduce 運算元操作 * action運算元 * f(U , U) => U * Created by asus on 2018/7/7. */ public class ReduceDemoJava { public static void main(String[] args) { SparkConf conf = new SparkConf() ; conf.setAppName("ReduceDemoJava") ; conf.setMaster("local[2]") ; System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ; JavaSparkContext sc = new JavaSparkContext(conf) ; List<Integer> numbers = new ArrayList<>() ; for(int i = 0 ; i <= 10 ; i ++) { numbers.add(i) ; } JavaRDD<Integer> numRDD = sc.parallelize(numbers , 3) ; numRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.println(num); } }); // Integer, Integer, Integer 前兩個代表輸入引數的型別,最後一個代表函式返回值型別 int sum = numRDD.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }) ; System.out.println("sum -> " + sum); sc.stop() ; } }
-
take運算元
package rddDemo.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.ArrayList; import java.util.List ; /** * take(n) 獲取RDD中的前 n 個元素 * Created by asus on 2018/7/8. */ public class TakeDemoJava { public static void main(String[] args) { SparkConf conf = new SparkConf() ; conf.setAppName("TakeDemoJava") ; conf.setMaster("local[2]") ; System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ; JavaSparkContext sc = new JavaSparkContext(conf) ; List<Integer> numbers = new ArrayList<>() ; for(int i = 0 ; i < 10 ; i ++) { numbers.add(i) ; } JavaRDD<Integer> numbersRDD = sc.parallelize(numbers , 3) ; List<Integer> take3Num = numbersRDD.take(3) ; for(int n : take3Num) { System.out.println(n); } sc.stop() ; } }
-
takeSample運算元
package rddDemo.action;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
/**
* Created by asus on 2018/7/8.
*/
public class TakeSampleDemoJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf() ;
conf.setAppName("TakeSampleDemoJava") ;
conf.setMaster("local[2]") ;
System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;
JavaSparkContext sc = new JavaSparkContext(conf) ;
List<String> names = new ArrayList<>() ;
names.add("lao wang") ;
names.add("xiao wang") ;
names.add("lao zhang") ;
names.add("xiao zhang") ;
names.add("lao li") ;
names.add("xiao li") ;
JavaRDD<String> namesRDD = sc.parallelize(names , 3) ;
// 無放回取樣,取 3 個元素
System.out.println(">>>>>>>>>>>>>>>>>> 無放回取樣,取 3 個元素 <<<<<<<<<<<<<<<<<<");
List<String> takeSample_1 = namesRDD.takeSample(false , 3) ;
for(String name : takeSample_1) {
System.out.println(name);
}
// 有放回取樣,取 3 個元素
System.out.println(">>>>>>>>>>>>>>>>>> 有放回取樣,取 3 個元素 <<<<<<<<<<<<<<<<<<");
List<String> takeSample_2 = namesRDD.takeSample(true , 3) ;
for(String name : takeSample_2) {
System.out.println(name);
}
// 無放回取樣,取 3 個元素,設定seed種子
System.out.println(">>>>>>>>>>>>>>>>>> 無放回取樣,取 3 個元素,設定seed種子 <<<<<<<<<<<<<<<<<<");
List<String> takeSample_3 = namesRDD.takeSample(false , 3 , 100) ;
for(String name : takeSample_3) {
System.out.println(name);
}
// 有放回取樣,取 3 個元素,設定seed種子
System.out.println(">>>>>>>>>>>>>>>>>> 有放回取樣,取 3 個元素,設定seed種子 <<<<<<<<<<<<<<<<<<");
List<String> takeSample_4 = namesRDD.takeSample(true , 3 , 100) ;
for(String name : takeSample_4) {
System.out.println(name);
}
sc.stop();
}
}
-
reduceByKey運算元
package rddDemo.action;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* ReduceByKey 運算元
* action 運算元
* Created by asus on 2018/7/7.
*/
public class ReduceByKeyDemoJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf() ;
conf.setAppName("ReduceByKeyDemo") ;
conf.setMaster("local[2]") ;
System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;
JavaSparkContext sc = new JavaSparkContext(conf) ;
List<Tuple2<String , Integer>> scores = new ArrayList<>() ;
scores.add(new Tuple2<>("lao wang" , 10)) ;
scores.add(new Tuple2<>("lao wang" , 10)) ;
scores.add(new Tuple2<>("xiao wang" , 50)) ;
scores.add(new Tuple2<>("xiao wang" , 50)) ;
scores.add(new Tuple2<>("lao zhang" , 50)) ;
scores.add(new Tuple2<>("lao zhang" , 50)) ;
scores.add(new Tuple2<>("xiaozhang zhang" , 90)) ;
scores.add(new Tuple2<>("xiaozhang zhang" , 90)) ;
JavaPairRDD<String , Integer> scoresRDD = sc.parallelizePairs(scores , 3) ;
scoresRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> score) throws Exception {
System.out.println("name : " + score._1 + " score : " + score._2);
}
});
JavaPairRDD<String , Integer> totalScoreRDD = scoresRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}) ;
totalScoreRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> totalScore) throws Exception {
System.out.println("name : " + totalScore._1 + " score : " + totalScore._2);
}
});
sc.stop();
}
}
-
counByKey運算元
package rddDemo.action;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 計算 RDD key 出現的次數
* Created by asus on 2018/7/15.
*/
public class CountByKeyDemoJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf() ;
conf.setAppName("CountByKeyDemoJava") ;
conf.setMaster("local[2]") ;
System.setProperty("hadoop.home.dir" , "E:\\hadoop-2.6.0") ;
JavaSparkContext sc = new JavaSparkContext(conf) ;
List<Tuple2<String , Integer>> scores = new ArrayList<>() ;
scores.add(new Tuple2<>("lao wang" , 10)) ;
scores.add(new Tuple2<>("lao wang" , 20)) ;
scores.add(new Tuple2<>("lao wang" , 30)) ;
scores.add(new Tuple2<>("xiao wang" , 50)) ;
scores.add(new Tuple2<>("lao zhang" , 60)) ;
scores.add(new Tuple2<>("xiao zhang" , 70)) ;
scores.add(new Tuple2<>("xiao zhang" , 80)) ;
JavaPairRDD<String , Integer> scoreRDD = sc.parallelizePairs(scores , 2) ;
Map<String , Long> keyCount = scoreRDD.countByKey() ;
for(String s : keyCount.keySet()) {
System.out.println("key -> " + s + " , " + " count -> " + keyCount.get(s).intValue());
}
sc.stop();
}
}