Hadoop的JVM重用機制和小檔案解決
Hadoop的JVM重用機制和小檔案解決
一、hadoop2.0 uber功能
1) uber的原理:Yarn的預設配置會禁用uber元件,即不允許JVM重用。我們先看看在這種情況下,Yarn是如何執行一個MapReduce job的。首先,Resource Manager裡的Applications Manager會為每一個application(比如一個使用者提交的MapReduce Job)在NodeManager裡面申請一個container,然後在該container裡面啟動一個Application Master。container在Yarn中是分配資源的容器(記憶體、cpu、硬碟等),它啟動時便會相應啟動一個JVM。此時,Application Master便陸續為application包含的每一個task(一個Map task或Reduce task)向Resource Manager申請一個container。等每得到一個container後,便要求該container所屬的NodeManager將此container啟動,然後就在這個container裡面執行相應的task。等這個task執行完後,這個container便會被NodeManager收回,而container所擁有的JVM也相應地被退出。在這種情況下,可以看出每一個JVM僅會執行一Task, JVM並未被重用。
2)使用者可以通過啟用uber元件來允許JVM重用——即在同一個container裡面依次執行多個task。在yarn-site.xml檔案中,改變一下幾個引數的配置即可啟用uber的方法:引數| 預設值 | 描述- mapreduce.job.ubertask.enable | (false) | 是否啟用user功能。如果啟用了該功能,則會將一個“小的application”的所有子task在同一個JVM裡面執行,達到JVM重用的目的。這個JVM便是負責該application的ApplicationMaster所用的JVM(執行在其container裡)。那具體什麼樣的application算是“小的application"呢?下面幾個引數便是用來定義何謂一個“小的application"- mapreduce.job.ubertask.maxmaps | 9 | map任務數的閥值,如果一個application包含的map數小於該值的定義,那麼該application就會被認為是一個小的application。- mapreduce.job.ubertask.maxreduces | 1 | reduce任務數的閥值,如果一個application包含的reduce數小於該值的定義,那麼該application就會被認為是一個小的application。不過目前Yarn不支援該值大於1的情況。- mapreduce.job.ubertask.maxbytes | | application的輸入大小的閥值。預設為dfs.block.size的值。當實際的輸入大小不超過該值的設定,便會認為該application為一個小的application。最後,我們來看當uber功能被啟用的時候,Yarn是如何執行一個application的。首先,Resource Manager裡的Applications Manager會為每一個application在NodeManager裡面申請一個container,然後在該container裡面啟動一個Application Master。containe啟動時便會相應啟動一個JVM。此時,如果uber功能被啟用,並且該application被認為是一個“小的application”,那麼Application Master便會將該application包含的每一個task依次在這個container裡的JVM裡順序執行,直到所有task被執行完。這樣Application Master便不用再為每一個task向Resource Manager去申請一個單獨的container,最終達到了 JVM重用(資源重用)的目的。
3)在yarn-site.xml裡的配置示例:
<!-- 開啟uber模式(針對小作業的優化) --> <property> <name>mapreduce.job.ubertask.enable</name> <value>true</value> </property> <!-- 配置啟動uber模式的最大map數 --> <property> <name>mapreduce.job.ubertask.maxmaps</name> <value>9</value> </property> <!-- 配置啟動uber模式的最大reduce數 --> <property> <name>mapreduce.job.ubertask.maxreduces</name> <value>1</value> </property>
2.0的uber模式開啟之後,JVM重用也一定生效。生效的條件是Map任務數量小於配置的任務數量,則認為是一個小任務,如果是小任務則JVM生效。
比如配置的Map任務=9 ,實際的map=5。如果實際的12,則認為不是一個小任務,則不開啟JVM重用機制。
補充:uber的規定,reduce的數量必須是1,如果reduce>1,則不認為是小任務。JVM重用機制,是針對任務為單位的,即不同Task是不能共用JVM重用機制的。使用JVM重用機制,一定要注意全域性變數的使用問題。
二、Hadoop小檔案問題
小檔案的定義:小檔案指的是那些size比HDFS 的block size(預設64M/1.0版本,128M/2.0版本)小的多的檔案。如果在HDFS中儲存海量的小檔案,會產生很多問題。
大量小檔案在HDFS中的問題:任何一個檔案,目錄和block,在HDFS中都會被表示為元資料資訊,每一個元資料資訊佔用150 bytes的記憶體空間。所以,如果有10million個檔案,每一個檔案對應一個block,那麼就將要消耗namenode 3G的記憶體來儲存這些block的資訊。如果規模再大一些,那麼將會超出現階段計算機硬體所能滿足的極限。不僅如此,HDFS並不是為了有效的處理大量小檔案而存在的。它主要是為了流式的訪問大檔案而設計的。對小檔案的讀取通常會造成大量從datanode到datanode的seeks和hopping來retrieve檔案,而這樣是非常的低效的一種訪問方式。
大量小檔案在mapreduce中的問題:Map tasks通常是每次處理一個block的input(預設使用FileInputFormat)。如果檔案非常的
小,並且擁有大量的這種小檔案,那麼每一個map task都僅僅處理了非常小的input資料,並且會產生大量的map tasks,每一個map task都會消耗一定量的bookkeeping的資源。比較一個1GB的檔案,預設block size為64M,和1Gb的檔案,沒一個檔案100KB,那麼後者沒一個小檔案使用一個map task,那麼job的時間將會十倍甚至百倍慢於前者。
hadoop中有一些特性可以用來減輕這種問題:可以在一個JVM中允許task reuse,以支援在一個JVM中執行多個map task,以此來減少一些JVM的啟動消耗。另一種方法是將多個小檔案合成一個spilt,即用一個map任務來處理。
三、Hadoop小檔案解決方案
在使用Hadoop處理海量小檔案的應用場景中,如果你選擇使用CombineFileInputFormat,而且你是第一次使用,可能你會感到有點迷惑。雖然,從這個處理方案的思想上很容易理解,但是可能會遇到這樣那樣的問題。使用CombineFileInputFormat作為Map任務的輸入規格描述,首先需要實現一個自定義的RecordReader。CombineFileInputFormat的大致原理是,他會將輸入多個數據檔案(小檔案)的元資料全部包裝到CombineFileSplit類裡面。也就是說,因為小檔案的情況下,在HDFS中都是單Block的檔案,即一個檔案一個Block,一個CombineFileSplit包含了一組檔案Block,包括每個檔案的起始偏移(offset),長度(length),Block位置(localtions)等元資料。如果想要處理一個CombineFileSplit,很容易想到,對其包含的每個InputSplit(實際上這裡面沒有這個,你需要讀取一個小檔案塊的時候,需要構造一個FileInputSplit物件)。在執行MapReduce任務的時候,需要讀取檔案的文字行(簡單一點是文字行,也可能是其他格式資料)。那麼對於CombineFileSplit來說,你需要處理其包含的小檔案Block,就要對應設定一個RecordReader,才能正確讀取檔案資料內容。通常情況下,我們有一批小檔案,格式通常是相同的,只需要在為CombineFileSplit實現一個RecordReader的時候,內建另一個用來讀取小檔案Block的RecordReader,這樣就能保證讀取CombineFileSplit內部聚積的小檔案。
為CombineFileSplit實現一個RecordReader,並在內部使用Hadoop自帶的
LineRecordReader來讀取小檔案的文字行資料程式碼實現:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class CombineSmallfileRecordReader extends RecordReader<LongWritable,BytesWritable>{
private CombineFileSplit combineFileSplit;
private LineRecordReader lineRecordReader=new LineRecordReader();
private Path[] paths;
private int totalLength;
private int currentIndex;
private float currentProgress=0;
private LongWritable currentKey;
private BytesWritable currentValue=new BytesWritable();
public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit,TaskAttemptContext context,Integer index) {
super();
this.combineFileSplit=combineFileSplit;
this.currentIndex=index;//當前要處理的小檔案Block在CombineFileSpilt中的索引
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.combineFileSplit = (CombineFileSplit) split;
// 處理CombineFileSplit中的一個小檔案Block,因為使用LineRecordReader,需要構造一個FileSplit物件,然後才能夠讀取資料
FileSplit fileSplit =new FileSplit(combineFileSplit.getPath(currentIndex),
combineFileSplit.getOffset(currentIndex),
combineFileSplit.getLength(currentIndex),
combineFileSplit.getLocations());
lineRecordReader.initialize(fileSplit, context);
this.paths = combineFileSplit.getPaths();
totalLength = paths.length;
context.getConfiguration().set("map.input.file.name",combineFileSplit.getPath(currentIndex).getName());
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (currentIndex >=0&¤tIndex< totalLength){
return lineRecordReader.nextKeyValue();
}else{
return false;
}
}
@Override
public BytesWritable getCurrentValue() throws IOException,InterruptedException {
byte[] content = lineRecordReader.getCurrentValue().getBytes();
currentValue.set(content,0,content.length);
return currentValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if(currentIndex >=0&& currentIndex < totalLength){
currentProgress = (float) currentIndex / totalLength;
return currentProgress;
}
return currentProgress;
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
如果存在這樣的應用場景,你的小檔案具有不同的格式,那麼就需要考慮對不同型別的小檔案,使用不同的內建RecordReader,具體邏輯也是在上面的類中實現。
我們已經為CombineFileSplit實現了一個RecordReader,然後需要在一個
CombineFileInputFormat中注入這個RecordReader類實現類
CombineSmallfileRecordReader的物件。這時,需要實現一個CombineFileInputFormat的子類,可以重寫createRecordReader方法。我們實現的CombineSmallfileInputFormat,程式碼如下所示:
CombineSmallfileInputFormat類
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable>{
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
CombineFileSplit combineFileSplit = (CombineFileSplit) split;
//這裡比較重要的是,一定要通過CombineFileRecordReader來建立一個RecordReader
//而且它的構造方法的引數必須是上面的定義的型別和順序。
//構造方法包含3個引數:第一個是CombineFileSplit型別,第二個是TaskAttemptContext型別,第三個是Class<? extends RecordReader>型別。
CombineFileRecordReader<LongWritable, BytesWritable> recordReader =new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);
try {
recordReader.initialize(combineFileSplit, context);
} catch (InterruptedException e) {
e.printStackTrace();
}
return recordReader;
}
}
CombineSmallfileMapper類
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CombineSmallfileMapper extends Mapper<LongWritable, BytesWritable, Text,Text>{
private Text file=new Text();
@Override
Mapper<LongWritable, BytesWritable, Text, Text>.Context context)throws IOException,InterruptedException {
String fileName=context.getConfiguration().get("map.input.file.name");
file.set(fileName);
context.write(file, new Text(new String(value.getBytes()).trim()));
}
}
CombineSmallfiles類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class CombineSmallfiles {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job =Job.getInstance(conf);
job.setJarByClass(CombineSmallfiles.class);
job.setMapperClass(CombineSmallfileMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(CombineSmallfileInputFormat.class);
FileInputFormat.setInputPaths(job, new
Path("hdfs://192.168.234.21:9000/score"));
FileOutputFormat.setOutputPath(job, new
Path("hdfs://192.168.234.21:9000/score/result"));
job.waitForCompletion(true);
}
}