Hadoop map reduce 過程獲取環境變數
Hadoop任務執行過程中,在每一個map節點或者reduce節點能獲取一下環境變數,利用這些變數可以為特殊的需求服務,例如:獲取當前map節點處理的資料檔案的路徑。
hadoop是java實現的,利用java可以很方便的獲取相關環境變數,其內部包含在Context和MRJobConfig中(hadoop版本不一樣,可能會有區別,我的hadoop是0.21)。
舉例:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MergeDaysMapper extends Mapper<LongWritable, Text, Text, Text> {
private String inputFile = null;
public void setup(Context context)
{
System.err.println("[STARTS TO GET PARAMETERS OF THIS JOB]");
Path input = ((FileSplit)context.getInputSplit()).getPath();
inputFile = input.toString();
System.err.println("Input: "+ input.toString());
System.out.println("Input: "+ input.getName());
System.out.println("MAP_INPUT_FILE: " + MRJobConfig.MAP_INPUT_FILE);
System.out.println("MAP_INPUT_PATH:"+ MRJobConfig.MAP_INPUT_PATH);
System.out.println("MAP_INPUT_START:"+ MRJobConfig.MAP_INPUT_START);
System.err.println("JOB_NAME" + MRJobConfig.JOB_NAME);
System.out.println("[FINISHED GETTING PARAMETERS OF THIS JOB]");
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
if (null == inputFile)
context.write(new Text("key"), new Text("inputFile"));
else
context.write(new Text("key"), new Text(inputFile));
}
}
同時,在streaming任務中也有同樣的需求,需要獲取相關環境變數,查過別人的資料,如下:
{{
streaming框架通過設定環境變數的方式給mapper、reducer程式傳遞配置資訊。常用的環境變數如下:
HADOOP_HOME |
計算節點上配置的Hadoop路徑 |
LD_LIBRARY_PATH |
計算節點上載入庫檔案的路徑列表 |
PWD |
當前工作目錄 |
dfs_block_size |
當前設定的HDFS檔案塊大小 |
map_input_file |
mapper正在處理的輸入檔案路徑 |
mapred_job_id |
作業ID |
mapred_job_name |
作業名 |
mapred_tip_id |
當前任務的第幾次重試 |
mapred_task_id |
任務ID |
mapred_task_is_map |
當前任務是否為map |
mapred_output_dir |
計算輸出路徑 |
mapred_map_tasks |
計算的map任務數 |
mapred_reduce_tasks |
計算的reduce任務數 |
}}
自己測試了一下,不對,又是版本問題,查了http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#How+do+I+get+the+JobConf+variables+in+a+streaming+job%27s+mapper%2Freducer%3F
解決如下:
Name | Type | Description |
---|---|---|
mapreduce.job.id | String | The job id |
mapreduce.job.jar | String | job.jar location in job directory |
mapreduce.job.local.dir | String | The job specific shared scratch space |
mapreduce.task.id | String | The task id |
mapreduce.task.attempt.id | String | The task attempt id |
mapreduce.task.ismap | boolean | Is this a map task |
mapreduce.task.partition | int | The id of the task within the job |
mapreduce.map.input.file | String | The filename that the map is reading from |
mapreduce.map.input.start | long | The offset of the start of the map input split |
mapreduce.map.input.length | long | The number of bytes in the map input split |
mapreduce.task.output.dir | String | The task's temporary output directory |
例子:
#!/bin/sh
while read line
do
echo "$line"
echo $mapreduce_map_input_file
done
測試通過