1. 程式人生 > >hadoop任務提交過程

hadoop任務提交過程

WordCountMapper:

private final static IntWritable one = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, 
        Context context) throws IOException, InterruptedException {
    StringTokenizer tokenizer = new StringTokenizer(value.toString());
    while (tokenizer.hasMoreTokens()) {
        String str = tokenizer.nextToken();
        context.write(new Text(StringUtils.trim(str.replaceAll("\\W", ""))), one);
    }
}

WordCountReduce:

@Override
public void reduce(Text key, Iterable

 

 
  
 
  
  values,
        Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
            sum += val.get();
    }
    context.write(key, new IntWritable(sum));
    }


 

 

WordCount:

final Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJobName("wordcount");
job.setJarByClass(WordCount.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);

如你所見, 這是個Hadoop基礎的入門例子, 如果你瞭解Hadoop, 你已經對這些程式碼熟記於心了.這篇文章我想說明Hadoop提交Job到底提交了那些東西,提交了哪些類,不需要提交那些東西.

Hadoop任務提交

傳統的Hadoop任務提交

把上面的程式碼 WordCountMapper, WordCountReduce, WordCount打包成jar,放到hadoop目錄下, 使用hadoop jar wordcount.jar WordCount執行任務. 這樣方式我稱為傳統的方式,也是《hadoop權威指南》上一貫的方法.

Eclipse的hadoop外掛的Hadoop任務提交

如果你開發過Hadoop的Job, 那麼對這個應該很熟悉.大多數開發測試都是用這個提交任務的,如果每次都是打包成jar, 再用hadoop jar 這還不把人搞瘋.

如果你還細心,你會發現,你選好Hadoop的jobtracker,提交任務的前一刻,Eclipse會彈出一個浮動視窗,上面跳動著顯示很多jar名.為什麼會這樣?它做了什麼?

在此輸入圖片描述

在Eclipse中當做Java Application執行為什麼不可以?

Hadoop的job專案都有main方法,這個是符合JavaApplication執行條件的,那麼我們是不是可以使用Eclipse中直接執行呢?當我們嘗試執行的時候,程式是可以執行的,但總當執行一會兒(幾秒鐘)後丟擲WordCountMapper ClassNotFount的錯誤.

那麼為什麼程式不是直接丟擲錯誤而是過了一會兒才丟擲?為什麼用Eclipse Hadoop外掛執行不會發生這個錯誤.

背景

寫這篇文章前,我們已經正在開發一個Hadoop的任務排程的系統. 也就是一個專案中提前寫好很多個Hadoop Job繫結到裡邊,如果想要執行哪個Job,我們就從前臺配置好引數,並把這個Job提交到Hadoop叢集. 並從前臺不斷的得到Job的執行任務資訊,取得Job的執行進度. 不關心進度的話直接去喝茶就可以,完了來看結果.

如果每次都是hadoop jar wordcount.jar WordCount未免太弱智. 如果每次從Eclispe中執行,那專業性太強,也不可取.是不是有更好的方法提交任務?

答案肯定是有的.

用JVisualVM監視Eclipse hadoop外掛的Hadoop任務提交

開啟JVisualVM準備著,執行一個Job. 執行後立即就可以看到一個Java程序. 用JVisualVM開啟這個程序檢視,如圖:

在此輸入圖片描述

在此輸入圖片描述

我開啟我電腦上的目錄F:\Eclipse\workspace.metadata.plugins\org.apache.hadoop.eclipse\hadoop-conf-1007657720166395816並且查看了它的上級目錄,頓時一些皆明朗了.Eclipse Hadoop外掛竟然把我的專案下所有的類,資原始檔打包成jar然後執行的.

在此輸入圖片描述

在此輸入圖片描述

使用Hadoop Api提交Job,完美解決方案

其實我也是從這篇文章(http://luliangy.iteye.com/blog/1401453)中找到靈感的.為什麼把專案打包,以Java Application的方式就正常運行了.

((JobConf) job.getConfiguration()).setJar("wordcount.jar");  
job.setJarByClass(WordCount.class);

衝著這股勁我看了很多Hadoop API,終於找到為什麼.

通過job.setJarByClass(WordCount.class); 這條路檢視原始碼, 你會找到如下的兩個方法.

private static String findContainingJar(Class my_class) {
    ClassLoader loader = my_class.getClassLoader();
    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
    try {
      for(Enumeration itr = loader.getResources(class_file);
          itr.hasMoreElements();) {
        URL url = (URL) itr.nextElement();
        if ("jar".equals(url.getProtocol())) {
          String toReturn = url.getPath();
          if (toReturn.startsWith("file:")) {
            toReturn = toReturn.substring("file:".length());
          }
          toReturn = URLDecoder.decode(toReturn, "UTF-8");
          return toReturn.replaceAll("!.*$", "");
        }
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return null;
  }

public void setJarByClass(Class cls) {
    String jar = findContainingJar(cls);
    if (jar != null) {
      setJar(jar);
    }   
}

我說說findContainingJar有什麼作用? 當使用job.setJarByClass(WordCount.class);設定類的時候, Hadoop Client能從你的classpath中取得WordCount.class所在的jar包的jar File絕對路徑.如果找不到jar, setJar(jar);方法沒有執行,jar肯定是個空值.

我們在Eclipse中直接以Java Application執行的時候,classpath是一個本地資料夾, findContainingJar肯定找不到專案的jar.也就是Mapper和Reduce所在的jar. 這樣在提交任務時候Configuration中mapper.jar屬性是一個空值.這也就解釋了為什麼在Eclipse中當做Java Application執行時總是過一段時間後才發生ClassNotFound的錯誤原因.

其實到這裡Hadoop提交了什麼也好解釋了.

Hadoop向叢集中提交了一個xml和一個攜帶Mapper/Reduce的jar. xml就是Configuration物件序列化的結果.

說到這裡也許你已經發現,這是一個開發上的架構問題.既然Hadoop Job需要Map/Reduce的jar.我們應該把所有的Map/Reduce單獨在一個專案中開發.然後打包放入排程系統專案的ClassPath就好了.然後在排程系統中構造Job,並把job.setJarByClass(class);中的class設定為該Job的map clas或者reduce class就行了.

哪些是在Client執行的?哪些是在Hadoop叢集中執行?

一個Hadoop 任務一般都有3個類(Map/Reduce/Job).WordCountMapper, WordCountReduce, WordCount你認為這三個類都會提交到叢集中執行嗎?

不是! 只有Mapper和Reduce這2個類會提交到Hadoop叢集, MapReduce執行也是這2個類. WordCount只是充當一個配置Job的客戶端,並且提交任務,之後又定時輪詢Job的執行狀態輸出簡單的日誌,直到任務完成,WordCount的這個程序會自動退出.

Hadoop分散式快取

講到這裡你也許能順利的實現一個和我相同思路的系統了.

但是我還是想說一個常見錯誤. 不是Mapper Class NotFound,而是Mapper中使用的Class NotFound. 而你又不想往hadoop叢集中新增jar包,也不想重啟Hadoop叢集. 你可以使用Hadoop提供的一個類:DistributedCache

  1. DistributedCache.addArchiveToClassPath() 新增hdfs上的jar到MapReduce的classpath
  2. DistributedCache.addCacheFile(new URI(“/myapp/lookup.dat#lookup.dat”), job);
  3. DistributedCache.addCacheArchive(new URI(“/myapp/map.zip”, job);
  4. DistributedCache.addFileToClassPath(new Path(“/myapp/mylib.jar”), job);
  5. DistributedCache.addCacheArchive(new URI(“/myapp/mytar.tar”, job);
  6. DistributedCache.addCacheArchive(new URI(“/myapp/mytgz.tgz”, job);
  7. DistributedCache.addCacheArchive(new URI(“/myapp/mytargz.tar.gz”, job);