1. 程式人生 > >Spark groupByKey、sortByKey、reduceByKey Java實現

Spark groupByKey、sortByKey、reduceByKey Java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import 
java.util.List; /** * Created by Knuth on 2018/2/2. */ public class TestGroupByKey { public static void main(String[] args) { //1.SparkConf SparkConf conf = new SparkConf().setAppName("TestSortByKey").setMaster("local"); //2.JavaSparkContext JavaSparkContext jsc = new JavaSparkContext(conf); //3.JavaPairRDD
List<Tuple2<String,Integer>> list = Arrays.asList( new Tuple2<String,Integer>("lmx",99), new Tuple2<String,Integer>("lt",98), new Tuple2<String,Integer>("lt",96), new Tuple2<String,Integer>("lmx"
,100), new Tuple2<String,Integer>("lmx",100), new Tuple2<String,Integer>("lt",99), new Tuple2<String,Integer>("lyx",100) ); //4.JavaPairRDD JavaPairRDD<String,Integer> javaPairRDD = jsc.parallelizePairs(list); //5.groupByKey JavaPairRDD<String,Iterable<Integer>> groupRDD = javaPairRDD.groupByKey(); //6.print groupRDD.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { @Override public void call(Tuple2<String, Iterable<Integer>> tp) throws Exception { System.out.println(tp._1+":"+tp._2); } }); //7.sortByKey JavaPairRDD<String,Integer> sortRDD = javaPairRDD.sortByKey(); //8.print sortRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tp) throws Exception { System.out.println(tp._1+":"+tp._2); } }); //9.reduceByKey JavaPairRDD<String,Integer> reduceRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); //10.print reduceRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tp) throws Exception { System.out.println(tp._1+":"+tp._2); } }); } }