MapReduce處理xml檔案(使用舊API)
阿新 • • 發佈:2019-02-09
1)MapReduce專案引入jar包:hadoop-streaming-2.6.5.jar
2)main函式主要程式碼段:
JobConf jobconf = new JobConf(new Configuration(), MreMroParser.class);
jobconf.setJobName("xmlParser");
//這裡標記使用流式輸入
jobconf.set("stream.recordreader.class",StreamXmlRecordReader.class.getName());
//開始標記為<bulkPmMrDataFile>
jobconf.set ("stream.recordreader.begin", "<bulkPmMrDataFile>");
//結束標記為</bulkPmMrDataFile>
jobconf.set("stream.recordreader.end", "</bulkPmMrDataFile>");
// 設定reduce的輸出結果key和value用逗號分隔
jobconf.set("mapred.textoutputformat.ignoreseparator", "true");
jobconf.set("mapred.textoutputformat.separator" , ",");
jobconf.setMapperClass(xmlParserMapper.class);
jobconf.setReducerClass(xmlParserReducer.class);
// 設定inputFormat
jobconf.setInputFormat(StreamInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass (Text.class);
MultipleInputs.addInputPath(jobconf, new Path(args[0]), StreamInputFormat.class,MreMroParserMapper.class);
FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));
JobClient.runJob(jobconf);
3)Map函式xmlParserMapper.class核心程式碼:
public class MreMroParserMapper extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
@Override
/*
* Context例項用於輸出內容的寫入
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
*/
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String xmlContent= key.toString();
System.out.println("'" + xmlContent+ "'");
/*自定義XML解析函式,將xmlContent送入*/
………………
我是使用dom4j:
Document document = DocumentHelper.parseText(xmlContent);
Element elementRoot = document.getRootElement();
解析後返回多記錄List resultDatas
………………
處理多記錄輸出:
for(int i=0;i<resultDatas.size();i++){
String data = dataFormater.formatResultData(resultDatas.get(i));
Text text = new Text();
text.set(data);
output.collect(new Text(resultDatas.get(i).getId()), text);
}