1. 程式人生 > >自定義分區實現全排序

自定義分區實現全排序

hadoop utf shuf compareto on() dex rabl return main

數據準備

  • 創建一個creatadatas.sh腳本
#!/bin/bash
for i in {1..1000};do
    echo $RANDOM
done;
  • 生成數據
$ sh createdatas.sh > data1
$ sh createdatas.sh > data2
$ sh createdatas.sh > data3

自定義分區

通過觀察,數據分布在[0,35000]這個區間,因此, 設置key >20000為第0分區,key >10000為第1分區,其他的為第2分區。

package com.hadoop.totasort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 自定義分區
 */
public class MyPartitioner extends Partitioner<IntWritable,IntWritable> {
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        int keyInt = Integer.parseInt(key.toString());
        if (keyInt >20000){
            return 0;
        }else if (keyInt >10000){
            return 1;
        }else {
            return 2;
        }
    }
}

自定義排序

shuffle過程中默認的排序是升序的,我們需要的是倒排序

package com.hadoop.totasort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義排序
 */
public class MySort extends WritableComparator {
    public MySort(){ // 很重要的一個構造方法
        super(IntWritable.class,true); 
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        IntWritable v1 = (IntWritable)a;
        IntWritable v2 = (IntWritable)b;
        return v2.compareTo(v1);
    }
}

編寫Mapper類

package com.hadoop.totasort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TotalSortMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int v = Integer.parseInt(value.toString());
        // key使用v是為了分區,value使用v是為了傳值
        context.write(new IntWritable(v),new IntWritable(v));
    }
}

編寫Reducer類

package com.hadoop.totasort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TotalSortReducer extends Reducer<IntWritable,IntWritable, NullWritable,IntWritable> {
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        for (IntWritable value : values) {
            context.write(NullWritable.get(),value);
        }
    }
}

編寫驅動類

package com.hadoop.totasort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TotalSortDriver {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(TotalSortDriver.class);
        job.setMapperClass(TotalSortMapper.class);
        job.setReducerClass(TotalSortReducer.class);
        //設置分區,排序,任務數
        job.setPartitionerClass(MyPartitioner.class);
        job.setSortComparatorClass(MySort.class);
        job.setNumReduceTasks(3);
        //其他設置
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job,new Path("input/totalsort"));
        FileOutputFormat.setOutputPath(job,new Path("output/totalsort"));
        job.waitForCompletion(true);
    }
}

運行結果

生成三個文件,part-r-00000part-r-00001part-r-00002

自定義分區實現全排序