Spark的三種join筆記
阿新 • • 發佈:2018-12-25
準備資料: orders:(訂單t1) order_id driver_id 1000 5000 1001 5001 1002 5002 drivers:(司機表t2) driver_id car_id 5000 100 5003 103 spark三種連線join join:自然連線,輸出連線鍵匹配的記錄 測試:hive (gulfstream_test)> select * from orders t1 join drivers t2 on (t1.driver_id = t2.driver_id) ; 結果: t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 left-outer-join:左外連結,輸出連線鍵匹配的記錄,左側的表無論匹配與否都輸出。 測試:hive (gulfstream_test)> select * from orders t1 left outer join drivers t2 on (t1.driver_id = t2.driver_id); 結果: t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 1001 5001 NULL NULL 1002 5002 NULL NULL right-outer-join:右外連線,輸出連線鍵匹配的記錄,右側的表無論匹配與否都輸出。 測試:hive (gulfstream_test)> select * from orders t1 right outer join drivers t2 on (t1.driver_id = t2.driver_id) ; 結果: t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 NULL NULL 5003 103 3、Spark描述 spark實現join的方式也是通過RDD的運算元,spark同樣提供了三個運算元join,leftOuterJoin,rightOuterJoin。 在下面給出的例子中,我們通過spark-hive讀取了Hive中orders表和drivers表中的資料,這時候資料的表現形式是DataFrame,如果要使用Join操作: 1)首先需要先將DataFrame轉化成了JavaRDD。 2)不過,JavaRDD其實是沒有join運算元的,下面還需要通過mapToPair運算元將JavaRDD轉換成JavaPairRDD,這樣就可以使用Join了。 需要指出的是 1)join運算元(join,leftOuterJoin,rightOuterJoin)只能通過PairRDD使用; 2)join運算元操作的Tuple2<Object1, Object2>型別中,Object1是連線鍵,我只試過Integer和String,Object2比較靈活,甚至可以是整個Row。 這裡我們使用driver_id作為連線鍵。 所以在輸出Tuple2的時候,我們將driver_id放在了前面。 public class Join implements Serializable { private transient JavaSparkContext javaSparkContext; private transient HiveContext hiveContext; /* * 初始化Load * 建立sparkContext, sqlContext, hiveContext * */ public Join() { initSparckContext(); initHiveContext(); } /* * 建立sparkContext * */ private void initSparckContext() { String warehouseLocation = System.getProperty("user.dir"); SparkConf sparkConf = new SparkConf() .setAppName("spark-join") .set("spark.sql.warehouse.dir", warehouseLocation) .setMaster("yarn-client"); javaSparkContext = new JavaSparkContext(sparkConf); } /* * 建立hiveContext * 用於讀取Hive中的資料 * */ private void initHiveContext() { hiveContext = new HiveContext(javaSparkContext); } public void join() { /* * 生成rdd1 * */ String query1 = "select * from gulfstream_test.orders"; DataFrame rows1 = hiveContext.sql(query1).select("order_id", "driver_id"); JavaPairRDD<String, String> rdd1 = rows1.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() { @Override public Tuple2<String, String> call(Row row) throws Exception { String orderId = (String)row.get(0); String driverId = (String)row.get(1); return new Tuple2<String, String>(driverId, orderId); } }); /* * 生成rdd2 * */ String query2 = "select * from gulfstream_test.drivers"; DataFrame rows2 = hiveContext.sql(query2).select("driver_id", "car_id"); JavaPairRDD<String, String> rdd2 = rows2.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() { @Override public Tuple2<String, String> call(Row row) throws Exception { String driverId = (String)row.get(0); String carId = (String)row.get(1); return new Tuple2<String, String>(driverId, carId); } }); /* * join * */ System.out.println(" ****************** join *******************"); JavaPairRDD<String, Tuple2<String, String>> joinRdd = rdd1.join(rdd2); Iterator<Tuple2<String, Tuple2<String, String>>> it1 = joinRdd.collect().iterator(); while (it1.hasNext()) { Tuple2<String, Tuple2<String, String>> item = it1.next(); System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 ); } /* * leftOuterJoin * */ System.out.println(" ****************** leftOuterJoin *******************"); JavaPairRDD<String, Tuple2<String, Optional<String>>> leftOuterJoinRdd = rdd1.leftOuterJoin(rdd2); Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> it2 = leftOuterJoinRdd.collect().iterator(); while (it2.hasNext()) { Tuple2<String, Tuple2<String, Optional<String>>> item = it2.next(); System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 ); } /* * rightOuterJoin * */ System.out.println(" ****************** rightOuterJoin *******************"); JavaPairRDD<String, Tuple2<Optional<String>, String>> rightOuterJoinRdd = rdd1.rightOuterJoin(rdd2); Iterator<Tuple2<String, Tuple2<Optional<String>, String>>> it3 = rightOuterJoinRdd.collect().iterator(); while (it3.hasNext()) { Tuple2<String, Tuple2<Optional<String>, String>> item = it3.next(); System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 ); } } public static void main(String[] args) { Join sj = new Join(); sj.join(); } }