Spark和Java API(三)Join
阿新 • • 發佈:2021-06-10
本文介紹如何基於Spark和Java來實現一個Join運算元的應用示例。
建立工程
建立一個Maven工程,pom.xml檔案如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.github.ralgond</groupId> <artifactId>spark-java-api</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.1</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
編寫java類JoinByItemId
建立一個包com.github.ralgond.sparkjavaapi,在該包下建立一個名為JoinByItemId的類,該類內容如下:
package com.github.ralgond.sparkjavaapi; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import scala.Tuple2; public class JoinByItemId { public static void main(String args[]) { String userFilePath = args[0]; String itemFilePath = args[1]; SparkConf conf = new SparkConf().setAppName("JoinByItemId Application"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> udata1 = sc.textFile(userFilePath); JavaPairRDD<String, String> udata2 = udata1.mapToPair(line -> { String[] a = line.split("\\s+", 2); return new Tuple2<String, String>(a[0], a[1]); }); JavaPairRDD<String, String> udata3 = udata2.mapToPair(t -> new Tuple2<String, String>(t._2, t._1)); JavaRDD<String> idata1 = sc.textFile(itemFilePath); JavaPairRDD<String, String> idata2 = idata1.mapToPair(line -> { String[] a = line.split("\\s+", 2); return new Tuple2<String, String>(a[0], a[1]); }); JavaPairRDD<String, Tuple2<String, Optional<String>>> rddWithJoin = udata3.leftOuterJoin(idata2); JavaPairRDD<String, String> res = rddWithJoin.mapToPair(t -> { if (t._2()._2().isPresent()) { return new Tuple2<String, String>(t._2()._1(), t._1() + "\t"+t._2()._2().get()); } else { return new Tuple2<String, String>(t._2()._1(), t._1() + "\t"+"NULL"); } }); List<Tuple2<String, String>> res2 = res.collect(); System.out.println(res2); } }
準備資料
進入spark的安裝目錄,在data資料夾裡面建立資料夾spark-java-api\JoinByItemId,在{SPARK_HOME}\data\spark-java-api\JoinByItemId建立兩個檔案:user.txt和item.txt。其中
user.txt的內容為:
A 1
B 1
C 2
D 2
E 3
item.txt的內容為:
1 item1
2 item2
3 item3
編譯並執行
通過mvn clean package編譯出jar包spark-java-api-0.0.1-SNAPSHOT.jar。
到spark安裝目錄裡,執行如下命令:
bin\spark-submit --class com.github.ralgond.sparkjavaapi.JoinByItemId D:\ralgond\spark-java-api\target\spark-java-api-0.0.1-SNAPSHOT.jar data\spark-java-api\JoinByItemId\user.txt data\spark-java-api\JoinByItemId\item.txt
便可以看到結果: