1. 程式人生 > >MapReduce處理CSV格式檔案的的一個例項

MapReduce處理CSV格式檔案的的一個例項

我們從網上下載一個cvs格式的資料檔案:
http://earthquake.usgs.gov/research/data/pager/EXPO_CAT_2007_12.csv
cvs是以逗號進行列分割的資料檔案。
使用opencvs可以很方便的處理cvs格式的資料。
opencvs可以從sourceforge上下載。
opencvs可以把一個string以逗號進行分割成一個string陣列
只擴充套件 Hadoop 的 Mapper 類。然後我可以使用泛型來為傳出鍵和值指定顯式類。型別子句也指定了傳入鍵和值,這對於讀取檔案分別是位元組數和文字行數。

EarthQuakesPerDateMapper 類擴充套件了 Hadoop 的 Mapper 物件。它顯式地將其輸出鍵指定為一個 Text 物件,將其值指定為一個 IntWritable,這是一個 Hadoop 特定類,實質上是一個整數。還要注意,class 子句的前兩個型別是 LongWritable 和 Text,分別是位元組數和文字行數。

由於類定義中的型別子句,我將傳入 map 方法的引數型別設定為在 context.write 子句內帶有該方法的輸出。如果我想指定其他內容,將會出現一個編譯器問題,或 Hadoop 將輸出一個錯誤訊息,描述型別不匹配的訊息。

一個mapper的實現:

public class EarthQuakesPerDateMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
 @Override
 protected void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {

  if (key.get() > 0) {
   try {
    CSVParser parser = new CSVParser();
    String[] lines = parser.parseLine(value.toString());
    lines = new CSVParser().parseLine(lines[0]);
    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
    Date dt = formatter.parse(lines[0]);
    formatter.applyPattern("dd-MM-yyyy");

    String dtstr = formatter.format(dt);
    context.write(new Text(dtstr), new IntWritable(1));
   } catch (java.text.ParseException e) {
    // TODO Auto-generated catch block
    //e.printStackTrace();
   }
  }
 }
}
reduce 實現如下 所示。與 Hadoop 的 Mapper 一樣,Reducer 被引數化了:前兩個引數是傳入的鍵型別(Text)和值型別(IntWritable),後兩個引數是輸出型別:鍵和值,這在本例中是相同的。


public class EarthQuakesPerDateReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {
 @Override
 protected void reduce(Text key, Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  int count = 0;
  for (IntWritable value : values) {
   count++;
  }
  context.write(key, new IntWritable(count));
 }
}

寫好mapper和reducer之後,就可以定義一個hadoop job了。

public class EarthQuakesPerDayJob {
 public static void main(String[] args) throws Throwable {
  Job job = new Job();
  job.setJarByClass(EarthQuakesPerDayJob.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(EarthQuakesPerDateMapper.class);
  job.setReducerClass(EarthQuakesPerDateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

在linux上執行hadoop:
$> export HADOOP_CLASSPATH=lib/opencsv-2.3.jar
$> hadoop jar hadoop.jar in out
在程式所在目錄定義一個子目錄in,把剛才所下載的cvs檔案放到in目錄下。
in就是程式資料的輸入目錄,out是輸出目錄,注意這個out資料夾是程式建立的,不可以手動建立。
執行是會看到:
11/09/05 08:47:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/09/05 08:47:26 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/09/05 08:47:26 INFO input.FileInputFormat: Total input paths to process : 1
11/09/05 08:47:26 INFO mapred.JobClient: Running job: job_local_0001
11/09/05 08:47:26 INFO input.FileInputFormat: Total input paths to process : 1
11/09/05 08:47:26 INFO mapred.MapTask: io.sort.mb = 100
11/09/05 08:47:27 INFO mapred.MapTask: data buffer = 79691776/99614720
11/09/05 08:47:27 INFO mapred.MapTask: record buffer = 262144/327680
11/09/05 08:47:27 INFO mapred.JobClient:  map 0% reduce 0%
11/09/05 08:47:28 INFO mapred.MapTask: Starting flush of map output
11/09/05 08:47:28 INFO mapred.MapTask: Finished spill 0
11/09/05 08:47:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
11/09/05 08:47:28 INFO mapred.LocalJobRunner: 
11/09/05 08:47:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
11/09/05 08:47:29 INFO mapred.LocalJobRunner: 
11/09/05 08:47:29 INFO mapred.Merger: Merging 1 sorted segments
11/09/05 08:47:29 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 97887 bytes
11/09/05 08:47:29 INFO mapred.LocalJobRunner: 
11/09/05 08:47:29 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
11/09/05 08:47:29 INFO mapred.LocalJobRunner: 
11/09/05 08:47:29 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
11/09/05 08:47:29 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to out1
11/09/05 08:47:29 INFO mapred.LocalJobRunner: reduce > reduce
11/09/05 08:47:29 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
11/09/05 08:47:29 INFO mapred.JobClient:  map 100% reduce 100%
11/09/05 08:47:29 INFO mapred.JobClient: Job complete: job_local_0001
11/09/05 08:47:29 INFO mapred.JobClient: Counters: 12
11/09/05 08:47:29 INFO mapred.JobClient:   FileSystemCounters
11/09/05 08:47:29 INFO mapred.JobClient:     FILE_BYTES_READ=11961631
11/09/05 08:47:29 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=9370383
11/09/05 08:47:29 INFO mapred.JobClient:   Map-Reduce Framework
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce input groups=142
11/09/05 08:47:29 INFO mapred.JobClient:     Combine output records=0
11/09/05 08:47:29 INFO mapred.JobClient:     Map input records=5639
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce shuffle bytes=0
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce output records=142
11/09/05 08:47:29 INFO mapred.JobClient:     Spilled Records=11274
11/09/05 08:47:29 INFO mapred.JobClient:     Map output bytes=86611
11/09/05 08:47:29 INFO mapred.JobClient:     Combine input records=0
11/09/05 08:47:29 INFO mapred.JobClient:     Map output records=5637
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce input records=5637

執行完成後:
cd到out目錄下,會看到一個part-r-00000檔案。
輸入命令:cat part-r-00000
可以看到hadoopjob的執行結果。