1. 程式人生 > >離線輕量級大資料平臺Spark之JavaRDD關聯join操作

離線輕量級大資料平臺Spark之JavaRDD關聯join操作

對兩個RDD進行關聯操作,如:

1)檔案post_data.txt包含:post_id\title\content

2)檔案train.txt包含:dev_id\post_id\praise\time

通過post_id關聯,提取post_id\content\praise欄位,檔案欄位用不定長多個空格分割;

參考程式碼如下:

package scs.contest;

import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class pageview {
	public static String path="/home/spark/data";
	public static void main(String[] args) {
	    //第一步:設定環境字符集,避免中文亂碼
	    Properties pps=System.getProperties(); 
	    pps.setProperty("file.encoding","UTF-8"); 
	    //第二步:txt資料匯入並分割成到訓練集和預測集
	    SparkConf sparkConf = new SparkConf().setAppName("pageview");
	    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
	    JavaRDD<String> postdataLines =jsc.textFile(path+"/post_data.txt");//id\title\content
	    JavaRDD<String> trainLines =jsc.textFile(path+"/train.txt");//dev_id\post_id\praise\time
	    JavaPairRDD<String,String> pdLines=postdataLines.mapToPair(new PairFunction<String,String,String>(){
	    	public Tuple2<String, String> call(String t) throws Exception {
	    		String[] st=t.split("\\s+");//多個空格情況的分割
	    		String content="";
	    		for(int i=1;i<st.length;i++){
	    			content+=st[i];
	    		}
	    		return new Tuple2(st[0],content);//title+content都取,title一般為空的情況下
	    	}
	    });
	    JavaPairRDD<String,String> ptLines=trainLines.mapToPair(new PairFunction<String,String,String>(){
	    	public Tuple2<String, String> call(String t) throws Exception {
	    		String[] st=t.split("\\s+");
	    		return new Tuple2(st[1], st[2]);
	    	}
	    });
	    //合併兩個JavaPairRDD,得到訓練集和預測集
	    JavaPairRDD<String, Tuple2<String, String>> tLines=pdLines.join(ptLines);//訓練街
	    Tuple2<String, Tuple2<String, String>>  tpfirst=tLines.first();
	    System.out.println(tpfirst._1+"|"+tpfirst._2()._1+"|"+tpfirst._2()._2);
	    //預測集,Optional<String>=Optional.empty的記錄數
	    JavaPairRDD<String, Tuple2<String, Optional<String>>> pLines=pdLines.leftOuterJoin(ptLines); 
	    Tuple2<String, Tuple2<String, String>>  tpfirst1=tLines.first();
	    System.out.println(tpfirst1._1+"|"+tpfirst1._2()._1+"|"+tpfirst1._2()._2);    
	}
}

執行結果:
507298|無聊的一天又開始了…|1
547732|人生不可能總一帆風順,總會有一些挫折的。|Optional.empty

參考官網的JavaPariRDD的join操作。