1. 程式人生 > 其它 >MapReduce中的自定義多目錄/檔名輸出HDFS

MapReduce中的自定義多目錄/檔名輸出HDFS

最近考慮到這樣一個需求:

需要把原始的日誌檔案用hadoop做清洗後,按業務線輸出到不同的目錄下去,以供不同的部門業務線使用。

這個需求需要用到MultipleOutputFormat和MultipleOutputs來實現自定義多目錄、檔案的輸出。

需要注意的是,在hadoop 0.21.x之前和之後的使用方式是不一樣的:

hadoop 0.21 之前的API 中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat 和 org.apache.hadoop.mapred.lib.MultipleOutputs,而到了 0.21 之後 的API為 org.apache.hadoop.mapreduce.lib.output.MultipleOutputs ,

新版的API 整合了上面舊API兩個的功能,沒有了MultipleOutputFormat。

本文將給出新舊兩個版本的API code。

1、舊版0.21.x之前的版本:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultiFile extends Configured implements Tool {

	public static class MapClass extends MapReduceBase implements
			Mapper<LongWritable, Text, NullWritable, Text> {

		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<NullWritable, Text> output, Reporter reporter)
				throws IOException {
			output.collect(NullWritable.get(), value);
		}

	}

	// MultipleTextOutputFormat 繼承自MultipleOutputFormat,實現輸出檔案的分類

	public static class PartitionByCountryMTOF extends
			MultipleTextOutputFormat<NullWritable, Text> { // key is
															// NullWritable,
															// value is Text
		protected String generateFileNameForKeyValue(NullWritable key,
				Text value, String filename) {
			String[] arr = value.toString().split(",", -1);
			String country = arr[4].substring(1, 3); // 獲取country的名稱
			return country + "/" + filename;
		}
	}

	// 此處不使用reducer
	/*
	 * public static class Reducer extends MapReduceBase implements
	 * org.apache.hadoop.mapred.Reducer<LongWritable, Text, NullWritable, Text>
	 * {
	 * 
	 * @Override public void reduce(LongWritable key, Iterator<Text> values,
	 * OutputCollector<NullWritable, Text> output, Reporter reporter) throws
	 * IOException { // TODO Auto-generated method stub
	 * 
	 * }
	 * 
	 * }
	 */
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		JobConf job = new JobConf(conf, MultiFile.class);

		Path in = new Path(args[0]);
		Path out = new Path(args[1]);

		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);

		job.setJobName("MultiFile");
		job.setMapperClass(MapClass.class);
		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(PartitionByCountryMTOF.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		job.setNumReduceTasks(0);
		JobClient.runJob(job);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new MultiFile(), args);
		System.exit(res);
	}

}

測試資料及結果:

hadoop fs -cat /tmp/multiTest.txt
5765303,1998,14046,1996,"AD","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,,
5785566,1998,14088,1996,"AD","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,,
5894770,1999,14354,1997,"AD","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,,
5765303,1998,14046,1996,"CN","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,,
5785566,1998,14088,1996,"CN","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,,
5894770,1999,14354,1997,"CN","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,,

from:

MultipleOutputFormat Example

http://mazd1002.blog.163.com/blog/static/665749652011102553947492/

2、新版0.21.x及之後的版本:

public class TestwithMultipleOutputs extends Configured implements Tool {

  public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> {

    private MultipleOutputs<Text,IntWritable> mos;

    protected void setup(Context context) throws IOException,InterruptedException {
      mos = new MultipleOutputs<Text,IntWritable>(context);
    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
      String line = value.toString();
      String[] tokens = line.split("-");

      mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1])));  //(第一處)
      mos.write("MOSText", new Text(tokens[0]),tokens[2]);     //(第二處)
      mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/");  //(第三處)同時也可寫到指定的檔案或資料夾中
    }

    protected void cleanup(Context context) throws IOException,InterruptedException {
      mos.close();
    }

  }
  public int run(String[] args) throws Exception {

    Configuration conf = getConf();

    Job job = new Job(conf,"word count with MultipleOutputs");

    job.setJarByClass(TestwithMultipleOutputs.class);

    Path in = new Path(args[0]);
    Path out = new Path(args[1]);

    FileInputFormat.setInputPaths(job, in);
    FileOutputFormat.setOutputPath(job, out);

    job.setMapperClass(MapClass.class);
    job.setNumReduceTasks(0);  

    MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class);
    MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class);

    System.exit(job.waitForCompletion(true)?0:1);
    return 0;
  }

  public static void main(String[] args) throws Exception {

    int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args);
    System.exit(res); 
  }

}

測試的資料:

abc-1232-hdf abc-123-rtd ioj-234-grjth ntg-653-sdgfvd kju-876-btyun bhm-530-bhyt hfter-45642-bhgf bgrfg-8956-fmgh jnhdf-8734-adfbgf ntg-68763-nfhsdf ntg-98634-dehuy hfter-84567-drhuk

結果截圖:(結果輸出到/test/testMOSout)

PS:遇到的一個問題:

  如果沒有mos.close(), 程式執行中會出現異常:

  12/05/21 20:12:47 WARN hdfs.DFSClient: DataStreamer Exception:

  org.apache.hadoop.ipc.RemoteException:org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on   /test/mosreduce/_temporary/_attempt_local_0001_r_000000_0/h-r-00000 File does not exist. [Lease. Holder: DFSClient_-352105532, pendingcreates: 5]

from:

MultipleOutputFormat和MultipleOutputs

http://www.cnblogs.com/liangzh/archive/2012/05/22/2512264.html

Hadoop利用Partitioner對輸出檔案分類(改寫partition,路由到指定的檔案中)

http://superlxw1234.iteye.com/blog/1495465

http://ghost-face.iteye.com/blog/1869926

更多參考&推薦閱讀:

1、【Hadoop】利用MultipleOutputs,MultiOutputFormat實現以不同格式輸出到多個檔案

http://www.cnblogs.com/iDonal/archive/2012/08/07/2626588.html

2、cdh3u3 hadoop 0.20.2 MultipleOutputs 多輸出檔案初探

http://my.oschina.net/wangjiankui/blog/49521

3、使用MultipleOutputs

http://blog.163.com/ecy_fu/blog/static/444512620101274344951/

4、Hadoop reduce多個輸出

http://blog.csdn.net/inte_sleeper/article/details/7042020

5、Hadoop 0.20.2中怎麼使用MultipleOutputFormat實現多檔案輸出和完全自定義檔名

http://www.cnblogs.com/flying5/archive/2011/05/04/2078407.html

6、Hadoop OutputFormat淺析

http://zhb-mccoy.iteye.com/blog/1591635

7、others:

https://sites.google.com/site/hadoopandhive/home/how-to-write-output-to-multiple-named-files-in-hadoop-using-multipletextoutputformat https://issues.apache.org/jira/browse/HADOOP-3149 http://grokbase.com/t/hadoop/common-user/112ewx7s15/could-i-write-outputs-in-multiple-directories

8、MultipleOutputs 官方範例

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

9、多資料來源輸入:MultipleInputs

http://stackoverflow.com/questions/17456369/mapreduce-job-with-mixed-data-sources-hbase-table-and-hdfs-files

https://groups.google.com/forum/#!topic/nosql-databases/SH61smOV-mo

http://bigdataprocessing.wordpress.com/2012/07/27/hadoop-hbase-mapreduce-examples/

http://hbase.apache.org/book/mapreduce.example.html

10、Hadoop多檔案輸出:MultipleOutputFormat和MultipleOutputs深究

http://www.iteblog.com/archives/842  (一)

http://www.iteblog.com/archives/848 (二)