1. 程式人生 > 其它 >Hadoop入門指南之表連線操作

Hadoop入門指南之表連線操作

技術標籤:大資料hadoop

Hadoop系列文章索引

Hadoop入門指南之HDFS介紹

Hadoop入門指南之Linux環境搭建

Hadoop入門指南之Linux軟體安裝

Hadoop入門指南之Hadoop安裝

Hadoop入門指南之hdfs命令列使用.

Hadoop入門指南之MapReduce介紹

Hadoop入門指南之統計庫存實戰

Hadoop入門指南之分割槽、規約實戰

Hadoop入門指南之排序實戰

Hadoop入門指南之分組實戰

Hadoop入門指南之表連線操作

Hadoop入門指南之yarn介紹​​​​​​​

在分析大資料時,有時需要處理多個表,表與表之間通過主鍵和外來鍵關聯。在分析的時候就需要把多個表關聯起來,在sql中通常是使用join連線查詢,在hadoop中也可以實現join操作來完成需求。

假設現在有兩個資料檔案,分別為students.txt和score.txt:

s001,張三,男
s002,小梅,女
c001,s001,語文,92
c002,s001,數學,88
c003,s001,英語,90
c004,s002,語文,95
c005,s002,數學,92
c006,s002,英語,91

可以看到,學生表的主鍵是成績表的外來鍵。現在需求是,需要顯示學生這次考試每個科目的成績。這裡就要把兩個表通過學生id來連線起來,那麼K2應該是這個學生id,V2是文件每一行的內容,K3是學生、成績、分數的內容,V3就是null。我們可以在mapper裡判斷資料來自於哪個文件,然後分別解析之後寫入,在reducer時,因為同一學生id資料都被放在了V2集合中,只需要判斷來自於哪個表,最後拼起來就好了。

先把資料儲存為student.txt和score.txt。

新建包com.demo.join_job。

先寫Mapper:

package com.demo.join_job;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class JoinMapper extends Mapper<LongWritable, Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit) context.getInputSplit();//強轉成FileSplit型別
        String name = fileSplit.getPath().getName();//獲取檔名
        String string = value.toString();
        String[] strings = string.split(",");//分割字串
        if("students.txt".equals(name)){//來自學生表
            String sid = strings[0];
            context.write(new Text(sid),value);
        }
        else{//來自成績表
            String sid = strings[1];
            context.write(new Text(sid),value);
        }
    }
}

這裡從context中獲得了InputSplit,因為FileSplit是InputSplit的子類,所以強轉成FileSplit,然後獲得檔名,根據檔名判斷來自哪個檔案,並做對應處理取得學生id。

接著寫Reducer:

package com.demo.join_job;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JoinReducer extends Reducer<Text,Text,Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String student=null;
        List<String> scoreList = new ArrayList<>();
        for (Text value : values) {
            String text = value.toString();
            if(text.startsWith("s")){//來自學生表
                student = text;
            }
            else{//來自成績表
                String[] strings = text.split(",");
                scoreList.add(strings[2]+"\t"+strings[3]);//拼接科目和成績
            }
        }
        String name = student.split(",")[1];//獲取學生名字
        for (String s : scoreList) {
            context.write(new Text(name+"\t"+s),NullWritable.get());
        }
    }
}

這裡就是根據是否為s開頭來判斷是否來自學生表,然後把資料取出來。因為不知道第一個是否是來自學生表,所以把科目表的資料放到集合中,待遍歷完所有V2後,再進行寫入操作。

最後是主類:

package com.demo.join_job;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JoinMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), "join_job");
        job.setJarByClass(JoinMain.class);//指定jar的主類
        job.setInputFormatClass(TextInputFormat.class);//指定輸入類
        TextInputFormat.addInputPath(job,new Path("file:///D:\\hadoop_job\\join_job_input"));//指定輸入路徑
        job.setMapperClass(JoinMapper.class);//指定Mapper類
        job.setMapOutputKeyClass(Text.class);//指定K2
        job.setMapOutputValueClass(Text.class);//指定V2
        job.setReducerClass(JoinReducer.class);//指定Reducer類
        job.setOutputKeyClass(Text.class);//指定K3
        job.setOutputValueClass(NullWritable.class);//指定V3
        job.setOutputFormatClass(TextOutputFormat.class);//指定輸出類
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\hadoop_job\\join_job_output"));//設定輸出路徑
        boolean b = job.waitForCompletion(true);//執行job
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();//新建一個配置物件
        int run = ToolRunner.run(configuration, new JoinMain(), args);//執行MapReduce
        System.exit(run);//系統退出
    }
}

這個主類和之前文章的沒啥區別,就是在本地運行了。

執行後可以看到結果檔案:

很好的完成了需求。