Spark使用小結:Java版Join操作
阿新 • • 發佈:2019-02-14
實現兩組資料的Join操作。
第一組:i, i*i
第二組:i, 'a'+i
join之後的結果是:i, i*i, 'a'+i
public class SparkSample { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String args[]) { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("SparkJavaWordCount"); sparkConf.setMaster("local"); JavaSparkContext context = new JavaSparkContext(sparkConf); List<Integer> data = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> distData= context.parallelize(data); JavaPairRDD<Integer, Integer> firstRDD = distData.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2(integer, integer*integer); } }); JavaPairRDD<Integer, String> secondRDD = distData.mapToPair(new PairFunction<Integer, Integer, String>() { @Override public Tuple2<Integer, String> call(Integer integer) throws Exception { return new Tuple2(integer, String.valueOf((char)('a' + integer))); }; }); JavaPairRDD<Integer, Tuple2<Integer, String>> joinRDD = firstRDD.join(secondRDD); JavaRDD<String> result = joinRDD.map(new Function<Tuple2<Integer, Tuple2<Integer, String>>, String>() { @Override public String call(Tuple2<Integer, Tuple2<Integer, String>> integerTuple2Tuple2) throws Exception { int key = integerTuple2Tuple2._1(); int value1 = integerTuple2Tuple2._2()._1(); String value2 = integerTuple2Tuple2._2()._2(); return key + " " + value1 + " " + value2; } }); List<String> reslist = result.collect(); for(String str : reslist) { System.out.println(str); } context.stop(); } }