1. 程式人生 > >MapReduce處理xml檔案(使用舊API)

MapReduce處理xml檔案(使用舊API)

1)MapReduce專案引入jar包:hadoop-streaming-2.6.5.jar
2)main函式主要程式碼段:

JobConf jobconf = new JobConf(new Configuration(), MreMroParser.class);
jobconf.setJobName("xmlParser");
//這裡標記使用流式輸入
jobconf.set("stream.recordreader.class",StreamXmlRecordReader.class.getName());
//開始標記為<bulkPmMrDataFile>
jobconf.set
("stream.recordreader.begin", "<bulkPmMrDataFile>"); //結束標記為</bulkPmMrDataFile> jobconf.set("stream.recordreader.end", "</bulkPmMrDataFile>"); // 設定reduce的輸出結果key和value用逗號分隔 jobconf.set("mapred.textoutputformat.ignoreseparator", "true"); jobconf.set("mapred.textoutputformat.separator"
, ","); jobconf.setMapperClass(xmlParserMapper.class); jobconf.setReducerClass(xmlParserReducer.class); // 設定inputFormat jobconf.setInputFormat(StreamInputFormat.class); jobconf.setOutputFormat(TextOutputFormat.class); jobconf.setOutputKeyClass(Text.class); jobconf.setOutputValueClass
(Text.class); MultipleInputs.addInputPath(jobconf, new Path(args[0]), StreamInputFormat.class,MreMroParserMapper.class); FileOutputFormat.setOutputPath(jobconf, new Path(args[1])); JobClient.runJob(jobconf);

3)Map函式xmlParserMapper.class核心程式碼:

public class MreMroParserMapper  extends MapReduceBase implements Mapper<Text, Text, Text, Text> {

  @Override
  /*
   * Context例項用於輸出內容的寫入
   * (non-Javadoc)
   * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
   */
  public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
      throws IOException {
    String xmlContent= key.toString();
    System.out.println("'" + xmlContent+ "'");
/*自定義XML解析函式,將xmlContent送入*/
………………
我是使用dom4j:

Document document = DocumentHelper.parseText(xmlContent); 
Element elementRoot = document.getRootElement();
解析後返回多記錄List resultDatas
………………
處理多記錄輸出:
for(int i=0;i<resultDatas.size();i++){
        String data = dataFormater.formatResultData(resultDatas.get(i));
        Text text = new Text();
        text.set(data);
        output.collect(new Text(resultDatas.get(i).getId()), text);
}