1. 程式人生 > >Hadoop多檔案輸出之MultipleOutputFormat和MultipleOutputs

Hadoop多檔案輸出之MultipleOutputFormat和MultipleOutputs

直到目前,我們看到的所有MapReduce作業都輸出一組檔案。但是,在一些場合下,經常要求我們輸出多組檔案或者把一個數據集分為多個數據集更為方便;比如將一個log裡面屬於不同業務線的日誌分開來輸出,並且交給相關的業務線。

用過舊API的人應該知道,舊API中有org.apache.hadoop.mapred.lib.MultipleOutputFormat和org.apache.hadoop.mapred.lib.MultipleOutputs兩個重要的類,但是由於舊版本的MultipleOutputFormat是基於行的劃分而MultipleOutputs是基於列的劃分。所以在新的API中就剩下了MultipleOutputs(mapreduce包中)類

,這個類合併了舊API中的MultipleOutputFormat和MultipleOutputs的功能,同時新版的類庫中已經不存在MultipleOutputFormat類了,因為MultipleOutputs都有它的功能了,還要它幹嘛。

下面我們通過一個例子來深入體會新版API中的MultipleOutputs的功能。

測試資料:

3070818,1963,1096,,"US","IN",,1,,441,6,69,,4,,0.625,,,,,,,
3070819,1963,1096,,"US","TN",,4,,12,6,63,,0,,,,,,,,,
3070820,1963,1096,,"GB","",,2,,12,6,63,,0,,,,,,,,,
3070821,1963,1096,,"US","IL",,2,,15,6,69,,1,,0,,,,,,,
3070822,1963,1096,,"US","NY",,2,,401,1,12,,4,,0.375,,,,,,,
3070823,1963,1096,,"US","MI",,1,,401,1,12,,8,,0.6563,,,,,,,
3070824,1963,1096,,"US","IL",,1,,401,1,12,,5,,0.48,,,,,,,
3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,
3070826,1963,1096,,"US","IA",,1,,401,1,12,,1,,0,,,,,,,
3070827,1963,1096,,"US","CA",,4,,401,1,12,,2,,0.5,,,,,,,
3070828,1963,1096,,"US","CT",,2,,16,5,59,,4,,0.625,,,,,,,
3070829,1963,1096,,"FR","",,3,,16,5,59,,5,,0.48,,,,,,,
3070830,1963,1096,,"US","NH",,2,,16,5,59,,0,,,,,,,,,
3070831,1963,1096,,"US","CT",,2,,16,5,59,,0,,,,,,,,,

#需求:按第四列(國家)將這些資訊輸出到不同的資料夾中。

實現程式碼:

public class MultipleOutputsTest {
	// 定義輸入路徑
		private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/multipleOutput/multipleOutput_data";
		// 定義輸出路徑
		private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

		public static void main(String[] args) {

			try {
				// 建立配置資訊
				Configuration conf = new Configuration();
				

				// 建立檔案系統
				FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
				// 如果輸出目錄存在,我們就刪除
				if (fileSystem.exists(new Path(OUT_PATH))) {
					fileSystem.delete(new Path(OUT_PATH), true);
				}

				// 建立任務
				Job job = new Job(conf, MultipleOutputsTest.class.getName());

				//1.1	設定輸入目錄和設定輸入資料格式化的類
				FileInputFormat.setInputPaths(job, INPUT_PATH);
				job.setInputFormatClass(TextInputFormat.class);

				//1.2	設定自定義Mapper類和設定map函式輸出資料的key和value的型別
				job.setMapperClass(MultipleOutputsMapper.class);
				job.setMapOutputKeyClass(NullWritable.class);
				job.setMapOutputValueClass(Text.class);

				//1.3	設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應)
				job.setPartitionerClass(HashPartitioner.class);
				job.setNumReduceTasks(0);

				//1.4	排序
				//1.5	歸約
				//2.1	Shuffle把資料從Map端拷貝到Reduce端。
				//2.2	指定Reducer類和輸出key和value的型別
				//job.setReducerClass(MyReducer.class);
				job.setOutputKeyClass(NullWritable.class);
				job.setOutputValueClass(Text.class);

				//2.3	指定輸出的路徑和設定輸出的格式化類
				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
				job.setOutputFormatClass(TextOutputFormat.class);


				// 提交作業 退出
				System.exit(job.waitForCompletion(true) ? 0 : 1);
			
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	public static class MultipleOutputsMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
		//定義MultipleOutputs
		private MultipleOutputs<NullWritable,Text> multipleOutputs = null;
		
		@Override
		protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
			//初始化MultipleOutputs物件
			multipleOutputs = new MultipleOutputs<NullWritable,Text>(context);
		}
		
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException,
				InterruptedException {
			//把結果直接寫出去(注意這裡不是context而是MultipleOutps的物件)
			multipleOutputs.write(NullWritable.get(), value, generateFileName(value));
		}
		
		/**
		 * 自定義一個產生檔名的方法
		 * @param value
		 * @return
		 */
		private String generateFileName(Text value){
			//對字串進行切分
			String[] splits = value.toString().split(",");
			//擷取國家的字串
			String country = splits[4].substring(1, 3);
			
			//返回一個包含國家的檔名
			return country + "/";
		}
		
		/**
		 * 用完MultipleOutputs之後一定要關閉
		 */
		@Override
		protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
			multipleOutputs.close();
		}
	}
}
程式執行結果:

如上圖所示,我們的結果按國家寫在了不同的目錄下,但是有個奇怪的問題就是,在輸出結果中還有兩個以part開頭的檔案,裡面什麼內容也沒有,這是怎麼回事呢?原因是他們都是程式的預設輸出檔案,而我們自定義的輸出格式不能以part開頭,那麼我們如何去掉這兩個不太和諧的檔案呢?其實很簡單,在main()函式中加入以下一行程式碼:

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
如果加入了上面一行程式碼,請同時註釋掉你程式碼中下面一行程式碼(如果有)
job.setOutputFormatClass(TextOutputFormat.class);

我們再來看看程式執行結果:


另外附上兩篇文章: