spark之join操作
阿新 • • 發佈:2018-11-21
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org. apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class join {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("join").setMaster ("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> persons = jsc.textFile("spark/input3/person.txt");
JavaRDD<String> addresses = jsc.textFile("spark/input3/address.txt");
//得到 key: 郵編號,value:學號和名字
JavaPairRDD<String, String> personkv = persons. mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {
String[] personsplit = str.split(" |\t"); // notice maybe \t
//System.out.println("length"+personsplit.length);
//處理預設資料
if (personsplit.length == 3)
{
//System.out.println("i am not null");
String code=personsplit[2];
String value = personsplit[0] +" "+ personsplit[1]; //number + name
return new Tuple2<String, String>(code, value);
}
else {
//System.out.println("null null null ++++++++="+personsplit.length);
return new Tuple2<String,String>(null,null);
}
}
}); // to split the person and get the form <210000,1 Aaron>
//得到key: 郵編號 value:城市
JavaPairRDD<String, String> addresskv = addresses.mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {
String[] addresssplit = str.split(" |\t");
String code=addresssplit[0];
String value = addresssplit[1]; //city
return new Tuple2<String, String>(code, value);
}
}); //to split the address and get the form <210000,Nanjing>
//進行join操作
JavaPairRDD<String, Tuple2<String, String>> joinres=personkv.join(addresskv);
//遍歷輸出
joinres.foreach(new VoidFunction<Tuple2<String, Tuple2<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Tuple2<String, String>> t) throws Exception {
System.out.println(t._2()._1+" "+t._1+ " "+t._2()._2);
}
});
joinres.saveAsTextFile("./spark/output3/");
}
}
輸入:
person.txt:
1 Aaron 210000
.....
address.txt:
210000 Nanjing
.........