Spark 常用的 Transformation 算子示例 ===> Java 版
阿新 • • 發佈:2019-04-04
功能 res 排序。 eve bool args pair slist col
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Int;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;import java.util.List;
public class TransformationCases {
public static void main(String[] args) {
//準備測試數據
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
List<String> text = Arrays.asList("cat,dog,rabbit","apple,pear,peach","eyes,nose,mouth");
List<Tuple2<String,Integer>> scores = Arrays.asList(new Tuple2<String, Integer>("class1",88),
new Tuple2<String, Integer>("class2",90),
new Tuple2<String, Integer>("class2",85),
new Tuple2<String, Integer>("class1",95),
new Tuple2<String, Integer>("class2",89));
List<Tuple2<Integer,String>> students = Arrays.asList(
new Tuple2<Integer, String>(1,"s1"),
new Tuple2<Integer, String>(2,"s2"),
new Tuple2<Integer, String>(3,"s3"),
new Tuple2<Integer, String>(4,"s4")
);
List<Tuple2<Integer,Integer>> stuScores = Arrays.asList(
new Tuple2<Integer, Integer>(1,100),
new Tuple2<Integer, Integer>(2,98),
new Tuple2<Integer, Integer>(3,98),
new Tuple2<Integer, Integer>(3,99),
new Tuple2<Integer, Integer>(2,99)
);
//拿到 SparkContext 對象
JavaSparkContext sc = getContext();
//測試 Transformation 方法:
// mapDemo(sc,numbers);
// filterDemo(sc,numbers);
// flatMapDemo(sc,text);
// groupByKeyDemo(sc,scores);
// reduceByKeyDemo(sc,scores);
// sortByKeyDemo(sc,scores);
// joinDemo(sc,students,stuScores);
cogroupDemo(sc,students,stuScores);
closeContext(sc);
}
//創建SparkConf 和 SparkContext 對象。
public static JavaSparkContext getContext(){
SparkConf conf = new SparkConf()
.setAppName("TransformationCases")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
return sc;
}
//關閉 SparkContext 對象。
public static void closeContext(JavaSparkContext sc){
if (sc != null){
sc.close();
}
}
//調用 map 算子實現功能:將集合中的每個元素乘以 2 .
public static void mapDemo(JavaSparkContext sc, List<Integer> numbers){
JavaRDD<Integer> rdd = sc.parallelize(numbers,1);
JavaRDD<Integer> doubledNumbers = rdd.map(new Function<Integer,Integer>() {
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
doubledNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
//調用 filter 算子實現功能:返回集合中所有的偶數。
public static void filterDemo(JavaSparkContext sc,List<Integer> numbers){
JavaRDD<Integer> rdd = sc.parallelize(numbers,1);
JavaRDD<Integer> evenNumbers = rdd.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
evenNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
//調用 flatMap 算子實現功能:將每個字符串拆分成單個的單詞。
public static void flatMapDemo(JavaSparkContext sc,List<String> text){
JavaRDD<String> rdd = sc.parallelize(text);
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(",")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
public void call(String word) throws Exception {
System.out.println(word);
}
});
}
//調用 groupByKey 算子實現功能:根據班級分組,將同一個班級的分數歸為一組。
public static void groupByKeyDemo(JavaSparkContext sc, List<Tuple2<String,Integer>> scores){
JavaPairRDD<String, Integer> lists = sc.parallelizePairs(scores);
JavaPairRDD<String,Iterable<Integer>> groupedScores = lists.groupByKey();
groupedScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
public void call(Tuple2<String, Iterable<Integer>> scores) throws Exception {
System.out.println(scores._1);
Iterator<Integer> iterator = scores._2.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
System.out.println("========================================");
}
});
}
//調用 reduceByKey 算子實現功能:計算每個班級分數總和。
public static void reduceByKeyDemo(JavaSparkContext sc,List<Tuple2<String,Integer>> scores){
JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(scores);
JavaPairRDD<String,Integer> reducedScores = rdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
reducedScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> scores) throws Exception {
System.out.println(scores._1 + " : " + scores._2);
}
});
}
//調用 sortedByKey 算子實現功能:按照分數做升序排序。
public static void sortByKeyDemo(JavaSparkContext sc,List<Tuple2<String,Integer>> scores){
JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(scores);
//因為是要根據分數排序,而原始數據的key是class,所以將key和value臨時調換一下。
JavaPairRDD<Integer,String> swapedRdd = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<String, Integer> pair) throws Exception {
return new Tuple2<Integer, String>(pair._2,pair._1);
}
});
//根據現在的key(分數)升序排序。
JavaPairRDD<Integer,String> sortedRdd = swapedRdd.sortByKey();
//排序完成後,還是要按照原始數據的key和value來保存,所以再把key和value調換回來。
JavaPairRDD<String,Integer> result = sortedRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<Integer, String> pair) throws Exception {
return new Tuple2<String, Integer>(pair._2,pair._1);
}
});
result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2);
}
});
}
//調用 join 算子實現功能:將兩個RDD的元素按照key做連接。
public static void joinDemo(JavaSparkContext sc,List<Tuple2<Integer,String>> students,List<Tuple2<Integer,Integer>> stuScores){
JavaPairRDD<Integer,String> stuRdd = sc.parallelizePairs(students);
JavaPairRDD<Integer,Integer> scoreRdd = sc.parallelizePairs(stuScores);
JavaPairRDD<Integer,Tuple2<String,Integer>> lists = stuRdd.join(scoreRdd);
lists.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
public void call(Tuple2<Integer, Tuple2<String, Integer>> pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2._1 + " : " + pairs._2._2);
}
});
}
//調用 cogroup 算子實現功能:將兩個RDD的元素按照key做連接。 它跟join實現的功能是一樣的,但是它們的返回值不同。
public static void cogroupDemo(JavaSparkContext sc,List<Tuple2<Integer,String>> students,List<Tuple2<Integer,Integer>> stuScores){
JavaPairRDD<Integer,String> stuRdd = sc.parallelizePairs(students);
JavaPairRDD<Integer,Integer> scoreRdd = sc.parallelizePairs(stuScores);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroupedRdd = stuRdd.cogroup(scoreRdd);
cogroupedRdd.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2._1 + " : " + pairs._2._2);
}
});
}
}
Spark 常用的 Transformation 算子示例 ===> Java 版