Hadoop MapReduce Job 相關引數設定 概念介紹與理解
Hadoop MapReduce Job 相關引數設定 概念介紹與理解
InputFormat:
作用:將輸入的檔案分成 一個個split,並且將split 拆分成一個個<key,value> 作為 map的輸入。
使用:通過job.setInputFormatClass() 進行設定
原理:
InputFormat 僅有兩個抽象方法:
1、List<InputSplit> getSplits() 獲取檔案計算出的輸入分片,解決資料分割成片的問題。
2、RecordReader<K,V>createRecordReader()
從InputSplit 中讀取,解決從分片中讀取資料地方問題。將讀入Map的資料拆分成<key,value>類
例項運用:public abstract class InputFormat<K, V> { /** * 僅僅是邏輯分片,並沒有物理分片,所以每一個分片類似於這樣一個元組 <input-file-path, start, offset> */ public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; /** * Create a record reader for a given split. */ public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }
import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Text; /** * 類說明 * * <pre> * Modify Information: * Author Date Description * ============ =========== ============================ * DELL 2017年3月15日 Create this file * </pre> * */ public class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } public TextArrayWritable(Text[] strings) { super(Text.class, strings); } }
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* 類說明
*
* <pre>
* Modify Information:
* Author Date Description
* ============ =========== ============================
* DELL 2017年3月15日 Create this file
* </pre>
*
*/
public class CSVInputFormat extends FileInputFormat<LongWritable, TextArrayWritable> {
public static final String CSV_TOKEN_SEPARATOR_CONFIG = "csvinputformat.token.delimiter";
@Override
protected boolean isSplitable(JobContext context, Path filename) {
CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
return codec == null;
}
@Override
public RecordReader<LongWritable, TextArrayWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
String csvDelimiter = context.getConfiguration().get(CSV_TOKEN_SEPARATOR_CONFIG);
Character separator = null;
if (csvDelimiter != null && csvDelimiter.length() == 1) {
separator = csvDelimiter.charAt(0);
}
return new CSVRecordReader(separator);
}
}
Mapper :
作用:實現map函式,根據輸入<key,value> 對生成中間結果。
使用:可以通過job.setMapperClass() 進行設定
原始碼:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN, VALUEIN> reader,
RecordWriter<KEYOUT, VALUEOUT> writer,
OutputCommitter committer, StatusReporter reporter,
InputSplit split) throws IOException, InterruptedException {
super(conf, taskid, reader, writer, committer, reporter, split);
}
}
/**
* 預處理,僅在map task啟動時執行一次
*/
protected void setup(Context context) throws IOException,
InterruptedException {
}
/**
* 對於InputSplit中的每一對<key, value>都會執行一次
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* 掃尾工作,比如關閉流等
*/
protected void cleanup(Context context) throws IOException,
InterruptedException {
}
/**
* map task的驅動器
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
原理:輸入時一個key -value 輸出一個key-value 主要寫個類重寫Mapper的map方法
例項:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key: "+key.toString());
System.out.println("value:"+value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Reducer :
作用:實現reduce 類,將中間結果合併,得到最終結果。
使用:job.setReducerClass()方法進行設定
使用例項:
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Combiner :
作用: 實現map函式 合併中間結果中有相同key的鍵值對。
使用: job.setCombinerClass()方法進行設定, 預設為null 不合並, 輸出是reduce的輸入
例項使用:
public static class Combine extends Reducer<Text,Text,Text,Text> {
// Reduce Method
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (Text value : values) {
String fields[] = value.toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(key, new Text(sum+","+count));
}
}
partioner類:
作用: 實現getPartition()函式用來在Shuffle 過程中,按照key 將中間資料分成R份, 每一份由一個Reducer 負責。
使用:job.setPartitioner() 進行設定,預設是HashPartitoner
Shuffle 作用: 將Map 階段的輸出,Copy Reduce節點本地
使用例項:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author xadil
*
*/
public class StatementsPartioner extends Partitioner<StatementsKeyType, Text> {
@Override
public int getPartition(StatementsKeyType key, Text value, int numPartitions) {
return Math.abs(key.getAccountId().hashCode()*127)%numPartitions;
}
}
OutputFormat :
作用: 負責輸出最終結果
使用:job.setOutputFormatClass() 進行設定, 預設是TextOutputFormat 類,TextOutputFormat 將結果寫成檔案。每行一個key -value 之間以製表符分開。
使用例項:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author xadil
*
*/
public class StatementPDFOutputFormat<K,V> extends FileOutputFormat<K,V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
Path file = getDefaultWorkFile(job, "");
return new PDFStatementWriter<K,V>(file,job.getConfiguration());
}
/*@Override
public RecordWriter<K, V> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
Path fullPath = new Path(file, "result.txt");
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(fullPath, false);
return new PDFWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(fullPath, false);
return new PDFWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}*/
}
OutputKey:
作用:設定最終結果key的型別
OutputValue:
作用:設定最終結果value的型別。
MapOutputKey:
作用:設定中間結果(Mapper)key的型別
使用:方法類似
MapOutputValue:
作用:設定中間結果(Mapper)value的型別。
使用:job.setMapOutputValueClass()GroupingComparator:
作用: 設定分組函式類,相同key放到一起,value 放到同一個函式類。
相關推薦
Hadoop MapReduce Job 相關引數設定 概念介紹與理解
Hadoop MapReduce Job 相關引數設定 概念介紹與理解 InputFormat: 作用:將輸入的檔案分成 一個個split,並且將split 拆分成一個個<key,value
Hadoop hadoop yarn 指令相關引數詳解
[Plain Text] 純文字檢視 複製程式碼 ? 1 2 [[email protected] bin]$ yarn classpath /home/hadoop/apache/hadoop-2.4.1/etc/hadoop:
講解ORACLE JOB INTERVAL引數設定
1:每分鐘執行 Interval => TRUNC(sysdate,'mi') + 1/ (24*60) 或 Interval => sysdate+1/1440 2:每天定時執行 例如:每天的凌晨1點執行 Interval => TRUNC(s
Hadoop MapReduce Job提交後的互動日誌
通過混合 NamdeNode, DataNode, ResourceManager,NodeManager的日誌輸出, 和 提交MapReduce Job的console輸出日誌,形成時間序列上的日誌輸出。 這樣可以方便檢視從client端提交job,整個hadoop的內
Hadoop MapReduce Job效能調優——修改Map和Reduce個數
MapReduce框架將檔案分為多個splits,併為每個splits建立一個Mapper,所以Mappers的個數直接由splits的數目決定。而Reducers的數目可以通過job.setNumReduceTasks()函式設定,預設情況只有一個Reducer。在真正的叢集環境下,如果預設,那麼所有的中
Flume1.4 相關引數設定,將收集的資料彙總到hdfs,解決許多小檔案問題
參照官方文件,將 flume client 收集的日誌檔案 彙總到 flume sink收集端,然後儲存到hdfs 中,預設會按生成許多小檔案,如圖所示 實際中可能只需要生成一個檔案即可,這就涉及到了幾個相關引數設定如下即可 需要修改的檔案位於 flume/con
opencv2-攝像頭獲取影象相關引數設定
Get和Set方法的引數如下 C++: boolVideoCapture::set(int propId, double value)和 double VideoCapture::get(intpro
Hadoop MapReduce Job效能調優——Map和Reduce個數
map task的數量即mapred.map.tasks的引數值,使用者不能直接設定這個引數。Input Split的大小,決定了一個Job擁有多少個map。預設input split的大小是64M(與dfs.block.size的預設值相同)。然而,如果輸入的資料量
Python相關常用庫概念介紹
關於一些常用到的python庫,在這裡收集整理一下概念,大致熟悉一下,等到遇到的時候,心裡就有個底了 time,bson,hashlib ,os,random,Image,ImageEnhance,math,smtplib,email,uuid,urllib,re,sysl
hadoop命令 -- job相關
hadoop命令列 與job相關的: 命令列工具 • 1.檢視 Job 資訊: hadoop job -list 2.殺掉 Job: hadoop job –kill job_id 3.指定路徑下檢視歷史日誌彙總: hadoop job -history outpu
【原創】關於Visual Studio相關插件的介紹與下載
ron 原來 bsp 推薦 blog 需要 地址 操作 說明 免責說明:本頁面插件僅供學習使用,切勿用於商業用途,插件來源均來自互聯網。 原文地址:http://www.cnblogs.com/cokefenta/p/7987766.html 在使用VS進行Unity的開發
觀察者模式介紹與理解
觀察者模式介紹與理解: 觀察者模式原理:類似於定牛奶業務 1. 奶站,subject:登記註冊,移除,通知(register,remove,notify) 2. 使用者,observer:接收輸入 觀察者模式:物件之間多對一依賴的一種設計模式,被依賴的物件稱為subject,依賴的物
java 大資料以及Hadoop相關概念介紹
一、大資料的基本概念1.1、什麼是大資料大資料指的就是要處理的資料是TB級別以上的資料。大資料是以TB級別起步的。在計算機當中,存放到硬碟上面的檔案都會佔用一定的儲存空間,例如: 檔案佔用的儲存空間代表的就是該檔案的大小,在計算機當中,檔案的大小可以採用以下單位來表示,各個
【嵌入式開發】時鐘初始化 ( 時鐘相關概念 | 嵌入式時鐘體系 | Lock Time | 分頻引數設定 | CPU 非同步模式設定 | APLL MPLL 時鐘頻率設定 )
本部落格的參考文章及相關資料下載 : 一. 時鐘相關概念解析 1. 相關概念術語 (1) 時鐘脈衝訊號 時鐘脈衝訊號 : 1.概念 : 按照 一定的電壓幅度 和 一定的時間間隔 , 連續發出的 脈衝
webservice學習總結(一)-- WebService相關概念介紹
IT strong 資源 fire 求和 log AC service服務 為什麽 一、WebService是什麽? 基於Web的服務:服務器端整出一些資源讓客戶端應用訪問(獲取數據) 一個跨語言、跨平臺的規範(抽象) 多個跨平臺、跨語言的應用間通信整合的方案(實際)
Hive快捷查詢:不啟用Mapreduce job啟用Fetch task三種方式介紹
should ima only cts 直接 mapr 問題 rom conf 如果查詢表的某一列,Hive中默認會啟用MapReduce job來完成這個任務,如下: hive>select id,name from m limit 10;--執行時hive會啟用
【完全分散式Hadoop】(二)HDFS、YARN以及HA高可用概念介紹
一、HDFS-Hadoop分散式檔案系統 HDFS 採用Master/Slave的架構來儲存資料,這種架構主要由四個部分組成,分別為HDFS Client、NameNode、DataNode和Secondary NameNode。下面我們分別介紹這四個組成部分 1、Client:就
MySQL的常見儲存引擎介紹與引數設定調優(轉載)
原文地址:http://www.cnblogs.com/demon89/p/8490229.html MySQL常用儲存引擎之MyISAM 特性: 1、併發性與鎖級別 2、表損壞修復 check table tablename repair table tabl
Activiti - 工作流相關概念介紹
工作流是什麼? 工作流(Workflow)是對工作流程及各操作步驟之間業務規則的抽象、概括描述 工作流建模: 將工作流程中的工作如何前後組織在一起的邏輯和規則,在計算機中以恰當的模型表達並對其實施計算 要解決的問題是為實現某個業務目標,利用計算機在多個參與者之間按某種預定規則自動傳遞
Hadoop MapReduce中map任務數量設定詳解
首先注意的是在Hadoop Streaming 中可以通過-D mapred.map.tasks=(你想要設定的map數量) 來確定map任務的個數, goal_num = mapred.map.tasks 但是這裡需要注意的是,只有在這個值大於hadoop中計算的默認個