1. 程式人生 > >hadoop程式設計小技巧(4)---全域性key排序類TotalOrderPartitioner

hadoop程式設計小技巧(4)---全域性key排序類TotalOrderPartitioner

Hadoop程式碼測試版本:Hadoop2.4

原理:在進行MR程式之前對輸入資料進行隨機提取樣本,把樣本排序,然後在MR的中間過程Partition的時候使用這個樣本排序的值進行分組資料,這樣就可以達到全域性排序的目的了。

難點:如果使用Hadoop提供的方法來實現全域性排序,那麼要求Mapper的輸入、輸出的key不變才可以,因為在原始碼InputSampler中提供的隨機抽取的資料是輸入資料最原始的key,如下程式碼(line:225):

      for (int i = 0; i < splitsToSample ||
                     (i < splits.size() && samples.size() < numSamples); ++i) {
        TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
            job.getConfiguration(), new TaskAttemptID());
        RecordReader<K,V> reader = inf.createRecordReader(
            splits.get(i), samplingContext);
        reader.initialize(splits.get(i), samplingContext);
        while (reader.nextKeyValue()) {
          if (r.nextDouble() <= freq) {
            if (samples.size() < numSamples) {
              samples.add(ReflectionUtils.copy(job.getConfiguration(),//  here is line 225
                                               reader.getCurrentKey(), null));
            } else {
              // When exceeding the maximum number of samples, replace a
              // random element with this one, then adjust the frequency
              // to reflect the possibility of existing elements being
              // pushed out
              int ind = r.nextInt(numSamples);
              if (ind != numSamples) {
                samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
                                 reader.getCurrentKey(), null));
              }
              freq *= (numSamples - 1) / (double) numSamples;
            }
          }
        }
        reader.close();
      }
其中的samples.add(  ...   就是新增的樣本,這樣其實應該是需要調整的。

舉一個很實際的例子,我的輸入一般都是LongWritable(距離文字首的長度),但是我的Mapper輸出的key可以是Text的型別的,那麼在建立樣本值的時候就會有問題。

如果解決呢?

其實可以把上面的程式碼中的新增部分修改一下,改為Mapper的map邏輯即可(參考下面的例項)。

應用場景:當MR程式有多個reducer的時候,就會相應的產生多個輸出檔案,這些輸出檔案內部是有按順序排列的,但是,檔案之間卻沒有按照順序排列,使用TotalOrderPartitioner就可以達到不同的檔案之間也是排序的效果。

例項:

測試資料,採用三個測試資料(這樣就可以有三個分片,預設一個檔案一個分片)

測試主程式完成的任務就是把上面的資料按照“_”分隔,然後把“_”後面的數字作為key,前面的字串作為value進行輸出,這樣就可以看到輸出的key是否是全域性排序的了。

測試主程式:

package fz.totalorder.partitioner;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.MyInputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//import fz.Utils;

public class PartitionerDriver extends Configured implements Tool {

	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = getConf();
		if(arg0.length!=3){
			System.err.println("Usage:\nfz.partitioner.PartitionerDriver <in> <out> <useTotalOrder>");
			return -1;
		}
//		System.out.println(conf.get("fs.defaultFS"));
		Path in = new Path(arg0[0]);
		Path out= new Path(arg0[1]);
		out.getFileSystem(conf).delete(out, true);
		Job job = Job.getInstance(conf,"total order partitioner");
		job.setJarByClass(getClass());
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setMapperClass(PartitionerMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setReducerClass(Reducer.class);
		job.setNumReduceTasks(2);
//		System.out.println(job.getConfiguration().get("mapreduce.job.reduces"));
//		System.out.println(conf.get("mapreduce.job.reduces"));
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		// reducer全域性排序
		if(arg0[2]!=null&&"true".equals(arg0[2])){
			job.setPartitionerClass(TotalOrderPartitioner.class);
//			InputSampler.Sampler<Text, Text> sampler = new 
//					InputSampler.RandomSampler<Text, Text>(0.1,20,3);
//			InputSampler.writePartitionFile(job, sampler);
			
			MyInputSampler.Sampler<Text, Text> sampler = new 
					MyInputSampler.RandomSampler<Text, Text>(0.1,20,3);
			MyInputSampler.writePartitionFile(job, sampler);
			
			String partitionFile = TotalOrderPartitioner.getPartitionFile(getConf());
			URI partitionUri= new URI(partitionFile+"#"+TotalOrderPartitioner.DEFAULT_PATH);
			job.addCacheArchive(partitionUri);
		}
		
		return job.waitForCompletion(true)?0:-1;
	}

	
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new PartitionerDriver(),args);
		
//		String[] arg = new String[]{
//				"hdfs://node33:8020/user/root/partition",
//				"hdfs://node33:8020/user/Administrator/partition",
//				"true"
//		};
//		ToolRunner.run(Utils.getConf(), new PartitionerDriver(),arg);
	}
}


PartitionerMapper類:

package fz.totalorder.partitioner;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PartitionerMapper extends Mapper<LongWritable,Text,Text ,Text>{

	private Text newKey= new Text();
	private Text newValue = new Text();
	public void map(LongWritable key, Text value, Context cxt) throws IOException,InterruptedException{
		String [] line =value.toString().split("_");
		if(line.length!=2){
			return ;
		}
		newKey.set(line[1]);
		newValue.set(line[0]);
		cxt.write(newKey, newValue);
	}
}

這裡可以看到Mapper的輸出和輸出是不一樣的,所以我們需要自定義InputSampler類,新增經過處理的key。其具體程式碼為:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Utility for collecting samples and writing a partition file for
 * {@link TotalOrderPartitioner}.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MyInputSampler<K,V> extends Configured implements Tool  {

  private static final Log LOG = LogFactory.getLog(MyInputSampler.class);

  static int printUsage() {
    System.out.println("sampler -r <reduces>\n" +
      "      [-inFormat <input format class>]\n" +
      "      [-keyClass <map input & output key class>]\n" +
      "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
      "             // Sample from random splits at random (general)\n" +
      "       -splitSample <numSamples> <maxsplits> | " +
      "             // Sample from first records in splits (random data)\n"+
      "       -splitInterval <double pcnt> <maxsplits>]" +
      "             // Sample from splits at intervals (sorted data)");
    System.out.println("Default sampler: -splitRandom 0.1 10000 10");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }

  public MyInputSampler(Configuration conf) {
    setConf(conf);
  }

  /**
   * Interface to sample using an 
   * {@link org.apache.hadoop.mapreduce.InputFormat}.
   */
  public interface Sampler<K,V> {
    /**
     * For a given job, collect and return a subset of the keys from the
     * input data.
     */
    K[] getSample(InputFormat<K,V> inf, Job job) 
    throws IOException, InterruptedException;
  }

  /**
   * Samples the first n records from s splits.
   * Inexpensive way to sample random data.
   */
  public static class SplitSampler<K,V> implements Sampler<K,V> {

    protected final int numSamples;
    protected final int maxSplitsSampled;

    /**
     * Create a SplitSampler sampling <em>all</em> splits.
     * Takes the first numSamples / numSplits records from each split.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     */
    public SplitSampler(int numSamples) {
      this(numSamples, Integer.MAX_VALUE);
    }

    /**
     * Create a new SplitSampler.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     * @param maxSplitsSampled The maximum number of splits to examine.
     */
    public SplitSampler(int numSamples, int maxSplitsSampled) {
      this.numSamples = numSamples;
      this.maxSplitsSampled = maxSplitsSampled;
    }

    /**
     * From each split sampled, take the first numSamples / numSplits records.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, Job job) 
        throws IOException, InterruptedException {
      List<InputSplit> splits = inf.getSplits(job);
      ArrayList<K> samples = new ArrayList<K>(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
      int samplesPerSplit = numSamples / splitsToSample;
      long records = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
            job.getConfiguration(), new TaskAttemptID());
        RecordReader<K,V> reader = inf.createRecordReader(
            splits.get(i), samplingContext);
        reader.initialize(splits.get(i), samplingContext);
        while (reader.nextKeyValue()) {
          samples.add(ReflectionUtils.copy(job.getConfiguration(),
                                           reader.getCurrentKey(), null));
          ++records;
          if ((i+1) * samplesPerSplit <= records) {
            break;
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
  }

  /**
   * Sample from random points in the input.
   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
   * each split.
   */
  public static class RandomSampler<K,V> implements Sampler<K,V> {
    protected double freq;
    protected final int numSamples;
    protected final int maxSplitsSampled;

    /**
     * Create a new RandomSampler sampling <em>all</em> splits.
     * This will read every split at the client, which is very expensive.
     * @param freq Probability with which a key will be chosen.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     */
    public RandomSampler(double freq, int numSamples) {
      this(freq, numSamples, Integer.MAX_VALUE);
    }

    /**
     * Create a new RandomSampler.
     * @param freq Probability with which a key will be chosen.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     * @param maxSplitsSampled The maximum number of splits to examine.
     */
    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
      this.freq = freq;
      this.numSamples = numSamples;
      this.maxSplitsSampled = maxSplitsSampled;
    }

    /**
     * Randomize the split order, then take the specified number of keys from
     * each split sampled, where each key is selected with the specified
     * probability and possibly replaced by a subsequently selected key when
     * the quota of keys from that split is satisfied.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, Job job) 
        throws IOException, InterruptedException {
      List<InputSplit> splits = inf.getSplits(job);
      ArrayList<K> samples = new ArrayList<K>(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.size());

      Random r = new Random();
      long seed = r.nextLong();
      r.setSeed(seed);
      LOG.debug("seed: " + seed);
      // shuffle splits
      for (int i = 0; i < splits.size(); ++i) {
        InputSplit tmp = splits.get(i);
        int j = r.nextInt(splits.size());
        splits.set(i, splits.get(j));
        splits.set(j, tmp);
      }
      // our target rate is in terms of the maximum number of sample splits,
      // but we accept the possibility of sampling additional splits to hit
      // the target sample keyset
      for (int i = 0; i < splitsToSample ||
                     (i < splits.size() && samples.size() < numSamples); ++i) {
        TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
            job.getConfiguration(), new TaskAttemptID());
        RecordReader<K,V> reader = inf.createRecordReader(
            splits.get(i), samplingContext);
        reader.initialize(splits.get(i), samplingContext);
        while (reader.nextKeyValue()) {
          if (r.nextDouble() <= freq) {
            if (samples.size() < numSamples) {
              samples.add(ReflectionUtils.copy(job.getConfiguration(),
                                               getFixedKey(reader), null)); // add here
            } else {
              // When exceeding the maximum number of samples, replace a
              // random element with this one, then adjust the frequency
              // to reflect the possibility of existing elements being
              // pushed out
              int ind = r.nextInt(numSamples);
              if (ind != numSamples) {
                samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
                		getFixedKey(reader), null)); // add here
              }
              freq *= (numSamples - 1) / (double) numSamples;
            }
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
/**
 * use new key
 * @param reader
 * @return
 */
	private K getFixedKey(RecordReader<K, V> reader) {
		K newKey =null;
		String[] line;
		try {
			line = reader.getCurrentValue().toString().split("_");
			Text newTmpKey = new Text(line[1]);
			newKey =(K) newTmpKey;
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		return newKey;
	}
  }

  /**
   * Sample from s splits at regular intervals.
   * Useful for sorted data.
   */
  public static class IntervalSampler<K,V> implements Sampler<K,V> {
    protected final double freq;
    protected final int maxSplitsSampled;

    /**
     * Create a new IntervalSampler sampling <em>all</em> splits.
     * @param freq The frequency with which records will be emitted.
     */
    public IntervalSampler(double freq) {
      this(freq, Integer.MAX_VALUE);
    }

    /**
     * Create a new IntervalSampler.
     * @param freq The frequency with which records will be emitted.
     * @param maxSplitsSampled The maximum number of splits to examine.
     * @see #getSample
     */
    public IntervalSampler(double freq, int maxSplitsSampled) {
      this.freq = freq;
      this.maxSplitsSampled = maxSplitsSampled;
    }

    /**
     * For each split sampled, emit when the ratio of the number of records
     * retained to the total record count is less than the specified
     * frequency.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, Job job) 
        throws IOException, InterruptedException {
      List<InputSplit> splits = inf.getSplits(job);
      ArrayList<K> samples = new ArrayList<K>();
      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
      long records = 0;
      long kept = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
            job.getConfiguration(), new TaskAttemptID());
        RecordReader<K,V> reader = inf.createRecordReader(
            splits.get(i), samplingContext);
        reader.initialize(splits.get(i), samplingContext);
        while (reader.nextKeyValue()) {
          ++records;
          if ((double) kept / records < freq) {
            samples.add(ReflectionUtils.copy(job.getConfiguration(),
                                 reader.getCurrentKey(), null));
            ++kept;
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
  }

  /**
   * Write a partition file for the given job, using the Sampler provided.
   * Queries the sampler for a sample keyset, sorts by the output key
   * comparator, selects the keys for each rank, and writes to the destination
   * returned from {@link TotalOrderPartitioner#getPartitionFile}.
   */
  @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
  public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
      throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = job.getConfiguration();
    final InputFormat inf = 
        ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    int numPartitions = job.getNumReduceTasks();
    K[] samples = (K[])sampler.getSample(inf, job);
    LOG.info("Using " + samples.length + " samples");
    RawComparator<K> comparator =
      (RawComparator<K>) job.getSortComparator();
    Arrays.sort(samples, comparator);
    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
    FileSystem fs = dst.getFileSystem(conf);
    if (fs.exists(dst)) {
      fs.delete(dst, false);
    }
    SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
      conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
    NullWritable nullValue = NullWritable.get();
    float stepSize = samples.length / (float) numPartitions;
    int last = -1;
    for(int i = 1; i < numPartitions; ++i) {
      int k = Math.round(stepSize * i);
      while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
        ++k;
      }
      writer.append(samples[k], nullValue);
      last = k;
    }
    writer.close();
  }

  /**
   * Driver for MyInputSampler from the command line.
   * Configures a JobConf instance and calls {@link #writePartitionFile}.
   */
  public int run(String[] args) throws Exception {
    Job job = new Job(getConf());
    ArrayList<String> otherArgs = new ArrayList<String>();
    Sampler<K,V> sampler = null;
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-r".equals(args[i])) {
          job.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else if ("-inFormat".equals(args[i])) {
          job.setInputFormatClass(
              Class.forName(args[++i]).asSubclass(InputFormat.class));
        } else if ("-keyClass".equals(args[i])) {
          job.setMapOutputKeyClass(
              Class.forName(args[++i]).asSubclass(WritableComparable.class));
        } else if ("-splitSample".equals(args[i])) {
          int numSamples = Integer.parseInt(args[++i]);
          int maxSplits = Integer.parseInt(args[++i]);
          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
          sampler = new SplitSampler<K,V>(numSamples, maxSplits);
        } else if ("-splitRandom".equals(args[i])) {
          double pcnt = Double.parseDouble(args[++i]);
          int numSamples = Integer.parseInt(args[++i]);
          int maxSplits = Integer.parseInt(args[++i]);
          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
          sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
        } else if ("-splitInterval".equals(args[i])) {
          double pcnt = Double.parseDouble(args[++i]);
          int maxSplits = Integer.parseInt(args[++i]);
          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
          sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
        } else {
          otherArgs.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
            args[i-1]);
        return printUsage();
      }
    }
    if (job.getNumReduceTasks() <= 1) {
      System.err.println("Sampler requires more than one reducer");
      return printUsage();
    }
    if (otherArgs.size() < 2) {
      System.out.println("ERROR: Wrong number of parameters: ");
      return printUsage();
    }
    if (null == sampler) {
      sampler = new RandomSampler<K,V>(0.1, 10000, 10);
    }

    Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
    TotalOrderPartitioner.setPartitionFile(getConf(), outf);
    for (String s : otherArgs) {
      FileInputFormat.addInputPath(job, new Path(s));
    }
    MyInputSampler.<K,V>writePartitionFile(job, sampler);

    return 0;
  }

  public static void main(String[] args) throws Exception {
    MyInputSampler<?,?> sampler = new MyInputSampler(new Configuration());
    int res = ToolRunner.run(sampler, args);
    System.exit(res);
  }
}

其主要程式碼是:
/**
 * use new key
 * @param reader
 * @return
 */
	private K getFixedKey(RecordReader<K, V> reader) {
		K newKey =null;
		String[] line;
		try {
			line = reader.getCurrentValue().toString().split("_");
			Text newTmpKey = new Text(line[1]);
			newKey =(K) newTmpKey;
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		return newKey;
	}
  }
其實就是把Mapper的邏輯拿過來而已。

檢視輸出:

首先不使用全域性分類的程式碼:



可以看到檔案之間是沒有排序的;

全域性排序的檔案輸出為:

可以看到檔案之間是有全域性排序的。

總結:使用Hadoop預設提供的全域性排序功能有限,可以自定義全域性排序的類,但是這樣針對每個MR可能都需要提供一個自定義的類,這樣也比較麻煩。總體來說,全域性排序的應用場景比較少見。

分享,成長,快樂