1. 程式人生 > >Hadoop MapReduce Job 相關引數設定 概念介紹與理解

Hadoop MapReduce Job 相關引數設定 概念介紹與理解

Hadoop MapReduce  Job 相關引數設定 概念介紹與理解

InputFormat:

作用:將輸入的檔案分成 一個個split,並且將split 拆分成一個個<key,value> 作為 map的輸入。

使用:通過job.setInputFormatClass() 進行設定

原理:

InputFormat 僅有兩個抽象方法: 

1、List<InputSplit> getSplits()  獲取檔案計算出的輸入分片,解決資料分割成片的問題。

2、RecordReader<K,V>createRecordReader()  從InputSplit 中讀取,解決從分片中讀取資料地方問題。將讀入Map的資料拆分成<key,value>類

public abstract class InputFormat<K, V> {
 
    /**
     * 僅僅是邏輯分片,並沒有物理分片,所以每一個分片類似於這樣一個元組 <input-file-path, start, offset>
     */
    public abstract List<InputSplit> getSplits(JobContext context)
            throws IOException, InterruptedException;
 
    /**
     * Create a record reader for a given split.
     */
    public abstract RecordReader<K, V> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException,
            InterruptedException;
 
}
例項運用:
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;

/**
 * 類說明
 * 
 * <pre>
 * Modify Information:
 * Author        Date          Description
 * ============ =========== ============================
 * DELL          2017年3月15日    Create this file
 * </pre>
 * 
 */

public class TextArrayWritable extends ArrayWritable {
    public TextArrayWritable() {
        super(Text.class);
    }

    public TextArrayWritable(Text[] strings) {
        super(Text.class, strings);
    }
}

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * 類說明
 * 
 * <pre>
 * Modify Information:
 * Author        Date          Description
 * ============ =========== ============================
 * DELL          2017年3月15日    Create this file
 * </pre>
 * 
 */

public class CSVInputFormat extends FileInputFormat<LongWritable, TextArrayWritable> {
    public static final String CSV_TOKEN_SEPARATOR_CONFIG = "csvinputformat.token.delimiter";

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
        return codec == null;
    }

    @Override
    public RecordReader<LongWritable, TextArrayWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        String csvDelimiter = context.getConfiguration().get(CSV_TOKEN_SEPARATOR_CONFIG);
        Character separator = null;
        if (csvDelimiter != null && csvDelimiter.length() == 1) {
            separator = csvDelimiter.charAt(0);
        }
        return new CSVRecordReader(separator);
    }
 
}



Mapper :

作用:實現map函式,根據輸入<key,value> 對生成中間結果。

使用:可以通過job.setMapperClass() 進行設定

原始碼:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
    public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context(Configuration conf, TaskAttemptID taskid,
                RecordReader<KEYIN, VALUEIN> reader,
                RecordWriter<KEYOUT, VALUEOUT> writer,
                OutputCommitter committer, StatusReporter reporter,
                InputSplit split) throws IOException, InterruptedException {
            super(conf, taskid, reader, writer, committer, reporter, split);
        }
    }
 
    /**
     * 預處理,僅在map task啟動時執行一次
     */
    protected void setup(Context context) throws IOException,
            InterruptedException {
    }
 
    /**
     * 對於InputSplit中的每一對<key, value>都會執行一次
     */
    @SuppressWarnings("unchecked")
    protected void map(KEYIN key, VALUEIN value, Context context)
            throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }
 
    /**
     * 掃尾工作,比如關閉流等
     */
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
    }
 
    /**
     * map task的驅動器
     */
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
        cleanup(context);
    }
}
原理:輸入時一個key -value 輸出一個key-value 主要寫個類重寫Mapper的map方法

例項:

 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("key: "+key.toString());
            System.out.println("value:"+value.toString());
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
Reducer :

作用:實現reduce 類,將中間結果合併,得到最終結果。

使用:job.setReducerClass()方法進行設定

使用例項:

  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }


Combiner :

作用: 實現map函式 合併中間結果中有相同key的鍵值對。

使用: job.setCombinerClass()方法進行設定, 預設為null 不合並, 輸出是reduce的輸入

例項使用:

    public static class Combine extends Reducer<Text,Text,Text,Text> {  
          
        // Reduce Method  
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
            double sum = 0;  
            int count = 0;  
            for (Text value : values) {  
                String fields[] = value.toString().split(",");  
                sum += Double.parseDouble(fields[0]);  
                count += Integer.parseInt(fields[1]);  
            }  
            context.write(key, new Text(sum+","+count));  
        }  
    }  


partioner類:

作用: 實現getPartition()函式用來在Shuffle 過程中,按照key 將中間資料分成R份, 每一份由一個Reducer 負責。

使用:job.setPartitioner() 進行設定,預設是HashPartitoner

Shuffle 作用: 將Map 階段的輸出,Copy Reduce節點本地

使用例項:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author xadil
 *
 */
public class StatementsPartioner extends Partitioner<StatementsKeyType, Text> {

	@Override
	public int getPartition(StatementsKeyType key, Text value, int numPartitions) {
		
		return Math.abs(key.getAccountId().hashCode()*127)%numPartitions;
	}

}
OutputFormat :

作用: 負責輸出最終結果

使用:job.setOutputFormatClass() 進行設定, 預設是TextOutputFormat 類,TextOutputFormat 將結果寫成檔案。每行一個key -value 之間以製表符分開。

使用例項:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author xadil
 *
 */
public class StatementPDFOutputFormat<K,V> extends FileOutputFormat<K,V> {
	public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";

	/* (non-Javadoc)
	 * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
	 */
	@Override
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		Path file = getDefaultWorkFile(job, "");
		
		return new PDFStatementWriter<K,V>(file,job.getConfiguration());
	}
	
	/*@Override
	public RecordWriter<K, V> getRecordWriter(
			TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
	    boolean isCompressed = getCompressOutput(job);
	    String keyValueSeparator= conf.get(SEPERATOR, "\t");
	    CompressionCodec codec = null;
	    String extension = "";
	    if (isCompressed) {
	      Class<? extends CompressionCodec> codecClass = 
	        getOutputCompressorClass(job, GzipCodec.class);
	      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
	      extension = codec.getDefaultExtension();
	    }
	    Path file = getDefaultWorkFile(job, extension);
	    Path fullPath = new Path(file, "result.txt");
	    FileSystem fs = file.getFileSystem(conf);
	    if (!isCompressed) {
	      FSDataOutputStream fileOut = fs.create(fullPath, false);
	      return new PDFWriter<K, V>(fileOut, keyValueSeparator);
	    } else {
	      FSDataOutputStream fileOut = fs.create(fullPath, false);
	      return new PDFWriter<K, V>(new DataOutputStream
	                                        (codec.createOutputStream(fileOut)),
	                                        keyValueSeparator);
	    }
	}*/
	
	
	
}

OutputKey:

作用:設定最終結果key的型別

OutputValue:

作用:設定最終結果value的型別。

MapOutputKey:

作用:設定中間結果(Mapper)key的型別

使用:方法類似

MapOutputValue:

作用:設定中間結果(Mapper)value的型別。

使用:job.setMapOutputValueClass()

GroupingComparator:

作用: 設定分組函式類,相同key放到一起,value 放到同一個函式類。

相關推薦

Hadoop MapReduce Job 相關引數設定 概念介紹理解

Hadoop MapReduce  Job 相關引數設定 概念介紹與理解 InputFormat: 作用:將輸入的檔案分成 一個個split,並且將split 拆分成一個個<key,value

Hadoop hadoop yarn 指令相關引數詳解

[Plain Text] 純文字檢視 複製程式碼 ? 1 2 [[email protected] bin]$ yarn classpath /home/hadoop/apache/hadoop-2.4.1/etc/hadoop:

講解ORACLE JOB INTERVAL引數設定

  1:每分鐘執行   Interval => TRUNC(sysdate,'mi') + 1/ (24*60)   或   Interval => sysdate+1/1440   2:每天定時執行   例如:每天的凌晨1點執行   Interval => TRUNC(s

Hadoop MapReduce Job提交後的互動日誌

通過混合 NamdeNode, DataNode, ResourceManager,NodeManager的日誌輸出, 和 提交MapReduce Job的console輸出日誌,形成時間序列上的日誌輸出。 這樣可以方便檢視從client端提交job,整個hadoop的內

Hadoop MapReduce Job效能調優——修改Map和Reduce個數

MapReduce框架將檔案分為多個splits,併為每個splits建立一個Mapper,所以Mappers的個數直接由splits的數目決定。而Reducers的數目可以通過job.setNumReduceTasks()函式設定,預設情況只有一個Reducer。在真正的叢集環境下,如果預設,那麼所有的中

Flume1.4 相關引數設定,將收集的資料彙總到hdfs,解決許多小檔案問題

參照官方文件,將 flume client 收集的日誌檔案 彙總到  flume sink收集端,然後儲存到hdfs 中,預設會按生成許多小檔案,如圖所示 實際中可能只需要生成一個檔案即可,這就涉及到了幾個相關引數設定如下即可 需要修改的檔案位於 flume/con

opencv2-攝像頭獲取影象相關引數設定

Get和Set方法的引數如下 C++: boolVideoCapture::set(int propId, double value)和 double VideoCapture::get(intpro

Hadoop MapReduce Job效能調優——Map和Reduce個數

 map task的數量即mapred.map.tasks的引數值,使用者不能直接設定這個引數。Input Split的大小,決定了一個Job擁有多少個map。預設input split的大小是64M(與dfs.block.size的預設值相同)。然而,如果輸入的資料量

Python相關常用庫概念介紹

關於一些常用到的python庫,在這裡收集整理一下概念,大致熟悉一下,等到遇到的時候,心裡就有個底了 time,bson,hashlib ,os,random,Image,ImageEnhance,math,smtplib,email,uuid,urllib,re,sysl

hadoop命令 -- job相關

hadoop命令列 與job相關的: 命令列工具 • 1.檢視 Job 資訊: hadoop job -list 2.殺掉 Job: hadoop  job –kill  job_id 3.指定路徑下檢視歷史日誌彙總: hadoop job -history outpu

【原創】關於Visual Studio相關插件的介紹下載

ron 原來 bsp 推薦 blog 需要 地址 操作 說明 免責說明:本頁面插件僅供學習使用,切勿用於商業用途,插件來源均來自互聯網。 原文地址:http://www.cnblogs.com/cokefenta/p/7987766.html 在使用VS進行Unity的開發

觀察者模式介紹理解

觀察者模式介紹與理解: 觀察者模式原理:類似於定牛奶業務 1. 奶站,subject:登記註冊,移除,通知(register,remove,notify) 2. 使用者,observer:接收輸入 觀察者模式:物件之間多對一依賴的一種設計模式,被依賴的物件稱為subject,依賴的物

java 大資料以及Hadoop相關概念介紹

一、大資料的基本概念1.1、什麼是大資料大資料指的就是要處理的資料是TB級別以上的資料。大資料是以TB級別起步的。在計算機當中,存放到硬碟上面的檔案都會佔用一定的儲存空間,例如:​ 檔案佔用的儲存空間代表的就是該檔案的大小,在計算機當中,檔案的大小可以採用以下單位來表示,各個

【嵌入式開發】時鐘初始化 ( 時鐘相關概念 | 嵌入式時鐘體系 | Lock Time | 分頻引數設定 | CPU 非同步模式設定 | APLL MPLL 時鐘頻率設定 )

本部落格的參考文章及相關資料下載 : 一. 時鐘相關概念解析 1. 相關概念術語 (1) 時鐘脈衝訊號 時鐘脈衝訊號 : 1.概念 : 按照 一定的電壓幅度 和 一定的時間間隔 , 連續發出的 脈衝

webservice學習總結(一)-- WebService相關概念介紹

IT strong 資源 fire 求和 log AC service服務 為什麽 一、WebService是什麽? 基於Web的服務:服務器端整出一些資源讓客戶端應用訪問(獲取數據) 一個跨語言、跨平臺的規範(抽象) 多個跨平臺、跨語言的應用間通信整合的方案(實際)

Hive快捷查詢:不啟用Mapreduce job啟用Fetch task三種方式介紹

should ima only cts 直接 mapr 問題 rom conf 如果查詢表的某一列,Hive中默認會啟用MapReduce job來完成這個任務,如下: hive>select id,name from m limit 10;--執行時hive會啟用

【完全分散式Hadoop】(二)HDFS、YARN以及HA高可用概念介紹

一、HDFS-Hadoop分散式檔案系統 HDFS 採用Master/Slave的架構來儲存資料,這種架構主要由四個部分組成,分別為HDFS Client、NameNode、DataNode和Secondary NameNode。下面我們分別介紹這四個組成部分 1、Client:就

MySQL的常見儲存引擎介紹引數設定調優(轉載)

原文地址:http://www.cnblogs.com/demon89/p/8490229.html MySQL常用儲存引擎之MyISAM 特性: 1、併發性與鎖級別 2、表損壞修復 check table tablename repair table tabl

Activiti - 工作流相關概念介紹

工作流是什麼? 工作流(Workflow)是對工作流程及各操作步驟之間業務規則的抽象、概括描述 工作流建模: 將工作流程中的工作如何前後組織在一起的邏輯和規則,在計算機中以恰當的模型表達並對其實施計算 要解決的問題是為實現某個業務目標,利用計算機在多個參與者之間按某種預定規則自動傳遞

Hadoop MapReduce中map任務數量設定詳解

首先注意的是在Hadoop Streaming 中可以通過-D mapred.map.tasks=(你想要設定的map數量) 來確定map任務的個數, goal_num = mapred.map.tasks 但是這裡需要注意的是,只有在這個值大於hadoop中計算的默認個