1. 程式人生 > >spark join算子

spark join算子

set @override serial reac cep auth bsp 學生 call

java

 1 /** 
 2  *join算子是根據兩個rdd的key進行關聯操作,類似scala中的拉鏈操作,返回的新元素為<key,value>,一對一
 3  *@author Tele
 4  *
 5  */
 6 public class JoinDemo {
 7     private static SparkConf conf = new SparkConf().setMaster("local").setAppName("joindemo");
 8     private static JavaSparkContext jsc = new JavaSparkContext(conf);
9 public static void main(String[] args) { 10 11 //假設每個學生只有一門成績 12 List<Tuple2<Integer,String>> studentList = Arrays.asList( 13 new Tuple2<Integer,String>(1,"tele"), 14 new
Tuple2<Integer,String>(2,"yeye"), 15 new Tuple2<Integer,String>(3,"wyc") 16 ); 17 18 List<Tuple2<Integer,Integer>> scoreList = Arrays.asList( 19 new
Tuple2<Integer,Integer>(1,100), 20 new Tuple2<Integer,Integer>(1,1100), 21 new Tuple2<Integer,Integer>(2,90), 22 new Tuple2<Integer,Integer>(3,70) 23 ); 24 25 26 JavaPairRDD<Integer, String> studentRDD = jsc.parallelizePairs(studentList); 27 JavaPairRDD<Integer, Integer> scoreRDD = jsc.parallelizePairs(scoreList); 28 29 //註意此處生成的新rdd對的參數類型,第一個泛型參數為key的類型,Tuple2的String與Integer分別對應原rdd的value類型 30 JavaPairRDD<Integer, Tuple2<String, Integer>> result = studentRDD.join(scoreRDD); 31 32 result.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { 33 private static final long serialVersionUID = 1L; 34 35 @Override 36 public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { 37 System.out.println("學號:" + t._1); 38 System.out.println("姓名:" + t._2._1); 39 System.out.println("成績:" + t._2._2); 40 System.out.println("================="); 41 } 42 }); 43 44 jsc.close(); 45 46 } 47 }

技術分享圖片

scala

 1 object JoinDemo {
 2     def main(args: Array[String]): Unit = {
 3       val conf = new SparkConf().setMaster("local").setAppName("joindemo");     
 4       val sc = new SparkContext(conf);
 5       
 6       val studentArr = Array((1,"tele"),(2,"yeye"),(3,"wyc"));
 7       val scoreArr = Array((1,100),(2,80),(3,100));
 8       
 9       val studentRDD = sc.parallelize(studentArr,1);
10       val scoreRDD = sc.parallelize(scoreArr,1);
11       
12       val result = studentRDD.join(scoreRDD);
13       
14       result.foreach(t=>{
15         println("學號:" + t._1);
16         println("姓名:" + t._2._1);
17         println("成績:" + t._2._2);
18         println("============")
19       })
20       
21     }
22 }

技術分享圖片

spark join算子