Hadoop入門指南之表連線操作
阿新 • • 發佈:2021-02-09
Hadoop系列文章索引
Hadoop入門指南之yarn介紹
在分析大資料時,有時需要處理多個表,表與表之間通過主鍵和外來鍵關聯。在分析的時候就需要把多個表關聯起來,在sql中通常是使用join連線查詢,在hadoop中也可以實現join操作來完成需求。
假設現在有兩個資料檔案,分別為students.txt和score.txt:
s001,張三,男
s002,小梅,女
c001,s001,語文,92
c002,s001,數學,88
c003,s001,英語,90
c004,s002,語文,95
c005,s002,數學,92
c006,s002,英語,91
可以看到,學生表的主鍵是成績表的外來鍵。現在需求是,需要顯示學生這次考試每個科目的成績。這裡就要把兩個表通過學生id來連線起來,那麼K2應該是這個學生id,V2是文件每一行的內容,K3是學生、成績、分數的內容,V3就是null。我們可以在mapper裡判斷資料來自於哪個文件,然後分別解析之後寫入,在reducer時,因為同一學生id資料都被放在了V2集合中,只需要判斷來自於哪個表,最後拼起來就好了。
先把資料儲存為student.txt和score.txt。
新建包com.demo.join_job。
先寫Mapper:
package com.demo.join_job; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class JoinMapper extends Mapper<LongWritable, Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit();//強轉成FileSplit型別 String name = fileSplit.getPath().getName();//獲取檔名 String string = value.toString(); String[] strings = string.split(",");//分割字串 if("students.txt".equals(name)){//來自學生表 String sid = strings[0]; context.write(new Text(sid),value); } else{//來自成績表 String sid = strings[1]; context.write(new Text(sid),value); } } }
這裡從context中獲得了InputSplit,因為FileSplit是InputSplit的子類,所以強轉成FileSplit,然後獲得檔名,根據檔名判斷來自哪個檔案,並做對應處理取得學生id。
接著寫Reducer:
package com.demo.join_job;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class JoinReducer extends Reducer<Text,Text,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String student=null;
List<String> scoreList = new ArrayList<>();
for (Text value : values) {
String text = value.toString();
if(text.startsWith("s")){//來自學生表
student = text;
}
else{//來自成績表
String[] strings = text.split(",");
scoreList.add(strings[2]+"\t"+strings[3]);//拼接科目和成績
}
}
String name = student.split(",")[1];//獲取學生名字
for (String s : scoreList) {
context.write(new Text(name+"\t"+s),NullWritable.get());
}
}
}
這裡就是根據是否為s開頭來判斷是否來自學生表,然後把資料取出來。因為不知道第一個是否是來自學生表,所以把科目表的資料放到集合中,待遍歷完所有V2後,再進行寫入操作。
最後是主類:
package com.demo.join_job;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JoinMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "join_job");
job.setJarByClass(JoinMain.class);//指定jar的主類
job.setInputFormatClass(TextInputFormat.class);//指定輸入類
TextInputFormat.addInputPath(job,new Path("file:///D:\\hadoop_job\\join_job_input"));//指定輸入路徑
job.setMapperClass(JoinMapper.class);//指定Mapper類
job.setMapOutputKeyClass(Text.class);//指定K2
job.setMapOutputValueClass(Text.class);//指定V2
job.setReducerClass(JoinReducer.class);//指定Reducer類
job.setOutputKeyClass(Text.class);//指定K3
job.setOutputValueClass(NullWritable.class);//指定V3
job.setOutputFormatClass(TextOutputFormat.class);//指定輸出類
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\hadoop_job\\join_job_output"));//設定輸出路徑
boolean b = job.waitForCompletion(true);//執行job
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();//新建一個配置物件
int run = ToolRunner.run(configuration, new JoinMain(), args);//執行MapReduce
System.exit(run);//系統退出
}
}
這個主類和之前文章的沒啥區別,就是在本地運行了。
執行後可以看到結果檔案:
很好的完成了需求。