1. 程式人生 > >spark之join操作

spark之join操作

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.
apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class join { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("join").setMaster
("local[*]"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> persons = jsc.textFile("spark/input3/person.txt"); JavaRDD<String> addresses = jsc.textFile("spark/input3/address.txt"); //得到 key: 郵編號,value:學號和名字 JavaPairRDD<String, String> personkv = persons.
mapToPair(new PairFunction<String, String,String>() { private static final long serialVersionUID = 1L; public Tuple2<String,String> call(String str) throws Exception { String[] personsplit = str.split(" |\t"); // notice maybe \t //System.out.println("length"+personsplit.length); //處理預設資料 if (personsplit.length == 3) { //System.out.println("i am not null"); String code=personsplit[2]; String value = personsplit[0] +" "+ personsplit[1]; //number + name return new Tuple2<String, String>(code, value); } else { //System.out.println("null null null ++++++++="+personsplit.length); return new Tuple2<String,String>(null,null); } } }); // to split the person and get the form <210000,1 Aaron> //得到key: 郵編號 value:城市 JavaPairRDD<String, String> addresskv = addresses.mapToPair(new PairFunction<String, String,String>() { private static final long serialVersionUID = 1L; public Tuple2<String,String> call(String str) throws Exception { String[] addresssplit = str.split(" |\t"); String code=addresssplit[0]; String value = addresssplit[1]; //city return new Tuple2<String, String>(code, value); } }); //to split the address and get the form <210000,Nanjing> //進行join操作 JavaPairRDD<String, Tuple2<String, String>> joinres=personkv.join(addresskv); //遍歷輸出 joinres.foreach(new VoidFunction<Tuple2<String, Tuple2<String, String>>>() { private static final long serialVersionUID = 1L; public void call(Tuple2<String, Tuple2<String, String>> t) throws Exception { System.out.println(t._2()._1+" "+t._1+ " "+t._2()._2); } }); joinres.saveAsTextFile("./spark/output3/"); } }

輸入:

person.txt:
	1 Aaron 210000
	.....
address.txt:
	210000 Nanjing
	.........