1. 程式人生 > 實用技巧 >MapReduce全流程_分割槽_排序

MapReduce全流程_分割槽_排序

1、MapReduce完成的工作流程:

2、分割槽操作(Partition分割槽

Partition分割槽案例實操

1.需求

將統計結果按照手機歸屬地不同省份輸出到不同檔案中(分割槽)期望輸出資料手機136137138139開頭都分別放到一個獨立4檔案中,其他開頭的放到一個檔案中。

程式碼如下:

package partiton;
import flow.FlowBean;
import flow.FlowMapper;
import flow.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;
public class partitonDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1、獲取job例項
        Job job=Job.getInstance(new Configuration());
        //2、設定類路徑
        job.setJarByClass(partitonDriver.class);
        //3、設定Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        job.setNumReduceTasks(5);
        job.setPartitionerClass(MyPartitioner.class);
        //4、設定輸入輸入輸出型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //5、設定輸入輸出路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //6、進行提交
        boolean b=job.waitForCompletion(true);
        System.exit(b ? 0:1);
    }
}

  

package partiton;

import flow.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Text, FlowBean> {
    //返回分割槽號
    public int getPartition(Text text, FlowBean flowBean, int i) {
        String phone=text.toString();
        switch (phone.substring(0,3)){
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;

        }
    }
}

  成功執行之後

並存儲為了檔案。顯然已經了分割槽操作

3、排序操作:

排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均會對資料按照key進行排序。該操作屬於Hadoop的預設行為。任何應用程式中的資料均會被排序,而不管邏輯上是否需要。預設排序是按照字典順序排序,且實現該排序的方法是快速排序。

(1)部分排序
MapReduce根據輸入記錄的鍵對資料集排序。保證輸出的每個檔案內部有序。
(2)全排序
最終輸出結果只有一個檔案,且檔案內部有序。實現方式是隻設定一個Reduce Task。但該方法在
處理大型檔案時效率極低,因為一臺機器處理所有檔案,完全喪失了MapReduce所提供的並行架構。

(3)輔助排序: (GroupingCompan tor分組)
在Redre端對key進行分組。應用於:在接收的ke y為bean物件時,想讓-個或幾個欄位相同(全部
欄位比較不相同)的hkey進入 到同-個reduce方法時,可以採用分組排序。
(4)二次排序.
在自定義排序過程中,如果compare To中的判斷條件為兩個即為二次排序。

程式碼如下:

package writablecomparable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable ,Comparable<FlowBean>{
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    //準備一個空參構造器
    public FlowBean()
    {}
    public void set(long upFlow,long downFlow)
    {
        this.downFlow=downFlow;
        this.upFlow=upFlow;
        this.sumFlow=upFlow+downFlow;
    }
    @Override
    public String toString()
    {
        return upFlow+"\t"+downFlow+"\t"+sumFlow;
    }
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    //序列化方法
    //提供資料的出口
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);

    }
   //反序列化方法
    //框架提供的資料來源
    public void readFields(DataInput dataInput) throws IOException {
        upFlow=dataInput.readLong();
        downFlow=dataInput.readLong();
        sumFlow=dataInput.readLong();
    }

    @Override
    public int compareTo(FlowBean o) {
        return Long.compare(o.sumFlow,this.sumFlow);
    }
    //這兩個方法裡面的內容順序要一樣uds,
}

  

package writablecomparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for(Text value:values)
        {
            context.write(value,key);
        }
    }
}

  

package writablecomparable;

import com.sun.tools.javac.comp.Flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job =Job.getInstance(new Configuration());
        job.setJarByClass(SortDriver.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\wev"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\wev"));

        boolean b=job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

  

package writablecomparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable,Text,FlowBean, Text> {
    private FlowBean flow=new FlowBean();
    private Text phone =new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fieds=value.toString().split("\t");
        phone.set(fieds[0]);
        flow.setUpFlow(Long.parseLong(fieds[1]));
        flow.setDownFlow(Long.parseLong(fieds[2]));
        flow.setSumFlow(Long.parseLong(fieds[3]));
        context.write(flow,phone);
    }
}

  

執行結果顯示已經按照流量排序而完成: