1. 程式人生 > >Hadoop map reduce 過程獲取環境變數

Hadoop map reduce 過程獲取環境變數

Hadoop任務執行過程中,在每一個map節點或者reduce節點能獲取一下環境變數,利用這些變數可以為特殊的需求服務,例如:獲取當前map節點處理的資料檔案的路徑。

hadoop是java實現的,利用java可以很方便的獲取相關環境變數,其內部包含在Context和MRJobConfig中(hadoop版本不一樣,可能會有區別,我的hadoop是0.21)。

舉例:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


public class MergeDaysMapper extends Mapper<LongWritable, Text, Text, Text> {

private String inputFile = null;

public void setup(Context context)
{
System.err.println("[STARTS TO GET PARAMETERS OF THIS JOB]");
Path input = ((FileSplit)context.getInputSplit()).getPath();
inputFile = input.toString();
System.err.println("Input: "+ input.toString());
System.out.println("Input: "+ input.getName());
System.out.println("MAP_INPUT_FILE: " + MRJobConfig.MAP_INPUT_FILE);
System.out.println("MAP_INPUT_PATH:"+ MRJobConfig.MAP_INPUT_PATH);
System.out.println("MAP_INPUT_START:"+ MRJobConfig.MAP_INPUT_START);
System.err.println("JOB_NAME" + MRJobConfig.JOB_NAME);
System.out.println("[FINISHED GETTING PARAMETERS OF THIS JOB]");
}
    
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
if (null == inputFile)
   context.write(new Text("key"), new Text("inputFile"));
else
context.write(new Text("key"), new Text(inputFile));
}
}

同時,在streaming任務中也有同樣的需求,需要獲取相關環境變數,查過別人的資料,如下:

{{

streaming框架通過設定環境變數的方式給mapper、reducer程式傳遞配置資訊。常用的環境變數如下:

HADOOP_HOME

計算節點上配置的Hadoop路徑

LD_LIBRARY_PATH

計算節點上載入庫檔案的路徑列表

PWD

當前工作目錄

dfs_block_size

當前設定的HDFS檔案塊大小

map_input_file

mapper正在處理的輸入檔案路徑

mapred_job_id

作業ID

mapred_job_name

作業名

mapred_tip_id

當前任務的第幾次重試

mapred_task_id

任務ID

mapred_task_is_map

當前任務是否為map

mapred_output_dir

計算輸出路徑

mapred_map_tasks

計算的map任務數

mapred_reduce_tasks

計算的reduce任務數

}}

自己測試了一下,不對,又是版本問題,查了http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#How+do+I+get+the+JobConf+variables+in+a+streaming+job%27s+mapper%2Freducer%3F

解決如下:

Name Type Description
mapreduce.job.id String The job id
mapreduce.job.jar String job.jar location in job directory
mapreduce.job.local.dir String The job specific shared scratch space
mapreduce.task.id String The task id
mapreduce.task.attempt.id String The task attempt id
mapreduce.task.ismap boolean Is this a map task
mapreduce.task.partition int The id of the task within the job
mapreduce.map.input.file String The filename that the map is reading from
mapreduce.map.input.start long The offset of the start of the map input split
mapreduce.map.input.length long The number of bytes in the map input split
mapreduce.task.output.dir String The task's temporary output directory
相關引數在streaming中“."用”_"代替即可。

例子:

#!/bin/sh


while read line
do
  echo "$line"
  echo $mapreduce_map_input_file
done

測試通過

linux