hadoop怎麼分割寫入的檔案為多個塊的,一個map對應一個split分片嗎?split與block的關係
1,在介紹hadoop寫檔案的時候我們經常會說首先分割檔案為多個塊;那麼是怎麼分割的呢?
這裡其實不要有過的糾結,這裡的塊是block,是hdfs中切塊的大小,屬於物理劃分,預設64M,在hadoop-default.xml配置中有體現:
<property> <name>dfs.block.size</name> <value>67108864</value> <description>The default block size for new files.</description> </property>
當然如果檔案沒有64M也不會佔據整塊空間。
將檔案分割成多個塊後,形成一個數據佇列,然後依次寫入datanode列表。
再者,如果寫入的是個資料夾,而且每個檔案的都不大,這樣在hdfs中是預設每個檔案一個塊的,即使沒有64m,當然也可做優化處理,不過hbase更便利於處理把小檔案合併到一個塊中,這個我會在其他博文中介紹。
2,下面我們說說split,並與block的關係
首先,split是mapreduce中的概念,而block是hdfs中切塊的大小。
如下:
//設定要處理的文字資料所存放的路徑 FileInputFormat.setInputPaths(wordCountJob, "hdfs://ubuntu:9000/input/aa.txt"); FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://ubuntu:9000/output/"));
我們設定要處理的檔案路徑時都會用到fileInputFormat類, 不過我們更多看到的是inputFormat,其實fileInputFormat這個類的也是實現inputFomat介面,
下面我們接著看原始碼,說明為什麼需要分片?
以hadoop自帶的wordCount的原始碼為例:
for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
我們看到使用的InputFormat是FileOutputFormat,任務執行呼叫了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的程式碼如下:
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
// 省略本文不關心的程式碼
return isSuccessful();
}
這裡的submit方法的實現如下:
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// 省略本文不關心的程式碼</span>
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
submit方法首先建立了JobSubmitter例項,然後非同步呼叫了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有關劃分任務的程式碼如下:
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
writeSplits方法的實現如下:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
由於WordCount使用的是新的mapreduce API,所以最終會呼叫writeNewSplits方法。writeNewSplits的實現如下:
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
writeNewSplits方法中,劃分任務數量最關鍵的程式碼即為InputFormat的getSplits方法(提示:大家可以直接通過此處的呼叫,檢視不同InputFormat的劃分任務實現)。根據前面的分析我們知道此時的InputFormat即為FileOutputFormat,其getSplits方法的實現如下:
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
totalSize:是整個Map-Reduce job所有輸入的總大小。
numSplits:來自job.getNumMapTasks(),即在job啟動時用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設定的值,給M-R框架的Map數量的提示。
goalSize:是輸入總大小與提示Map task數量的比值,即期望每個Mapper處理多少的資料,僅僅是期望,具體處理的資料數由下面的computeSplitSize決定。
minSplitSize:預設為1,可由子類複寫函式protected void setMinSplitSize(long minSplitSize) 重新設定。一般情況下,都為1,特殊情況除外。
minSize:取的1和mapred.min.split.size中較大的一個。
blockSize:HDFS的塊大小,預設為64M,一般大的HDFS都設定成128M。
splitSize:就是最終每個Split的大小,那麼Map的數量基本上就是totalSize/splitSize。
接下來看看computeSplitSize的邏輯:首先在goalSize(期望每個Mapper處理的資料量)和HDFS的block size中取較小的,然後與mapred.min.split.size相比取較大的。
一個片為一個splits,即一個map,只要搞清楚片的大小,就能計算出執行時的map數。而一個split的大小是由goalSize, minSize, blockSize這三個值決定的。computeSplitSize的邏輯是,先從goalSize和blockSize兩個值中選出最小的那個(比如一般不設定map數,這時blockSize為當前檔案的塊size,而goalSize是檔案大小除以使用者設定的map數得到的,如果沒設定的話,預設是1),在預設的大多數情況下,blockSize比較小。然後再取blockSize和minSize中最大的那個。而minSize如果不通過”mapred.min.split.size”設定的話(”mapred.min.split.size”預設為0),minSize為1,可理解為一個block塊,這樣得出的一個splits的size就是blockSize,即一個塊一個map,有多少塊就有多少map。
split的大小時預設和hdfs的block塊大小一致,但是可以通過配置檔案自己設定:
其中有倆個配置檔案(如下):
--minsize 預設大小為1
mapreduce.input.fileinputformat.split.minsize
--maxsize 預設大小為Long.MAXValue
mapreduce.input.fileinputformat.split.maxsize
舉例:
比如說我問寫入一個資料夾,裡面有10個只有幾k的檔案,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小為10。在遍歷files列表的過程中,會獲取每個檔案的blockSize,最終呼叫computeSplitSize方法計算每個輸入檔案應當劃分的任務數。computeSplitSize方法的實現如下:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
然後根據
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
很明顯取split預設值,也就是一個塊,那麼10個就要分為10塊,這也說明為什麼處理小檔案時,block的大小小於split的 大小。同時我們看到了一個split分配一個map任務。
這裡我們可以總結下split大小與block的關係:
(1)block塊的小於split分片的最小值,那split的值就是split分片的大小
(2)block塊的小大介於split分片配置的最小值和最大值之間,block的大小就是split的大小。
(3)block塊的大小大於split分片的最大值,split的大小就是split配置的最大值。但會增加map執行的併發度,但是會造成在節點之間拉取資料
也有公式可以計算split也就是map任務數,這裡就不做討論了。
一個map對應一個split分片嗎?
經過上面的討論,答案是顯而易見的:
map個數:由任務切片spilt決定的,預設情況下一個split的大小就是block
由參與任務的檔案個數決定的
相關推薦
hadoop怎麼分割寫入的檔案為多個塊的,一個map對應一個split分片嗎?split與block的關係
1,在介紹hadoop寫檔案的時候我們經常會說首先分割檔案為多個塊;那麼是怎麼分割的呢?這裡其實不要有過的糾結,這裡的塊是block,是hdfs中切塊的大小,屬於物理劃分,預設64M,在hadoop-default.xml配置中有體現:<property>
Hadoop 檢視某個檔案分成幾個塊,分別在那臺機架的哪個機器上
hadoop fsck /usr/input/a.txt -files -blocks -locations -racks [[email protected] sbin]# hadoop
博士問,如何分割一個大檔案為多個檔案
參考:http://www.cnblogs.com/waynechen/archive/2010/07/26/1785097.html split -l 300 large_file.txt new_file_prefix 解決 (adsbygoogle = windo
涉及多個Fragment,點選其中一個fragment中的“點選登入”按鈕,跳轉到“登入”介面時,出現程式閃退現象
問題說明:我想實現從當前fragment_me中點選一下登入按鈕,跳轉到登入介面,所以我在MeFragment.java裡的onCreate()方法中為TextView控制元件新增監聽事件。但在實際執行中,我點選“Me”之後,會出現閃退現象。 錯誤:空指標異常 出現閃
一個jar包裡有多個main,指定執行某一個main
如果一個jar中含有多個主程式,而你沒有配置預設主程式,或者想要執行指定主程式,則可以通過如下命令執行:java -cp example03-1.0-SNAPSHOT.jar com.alan.HelloWorld-cp <目錄和 zip/jar 檔案的類搜尋路徑>
linux系統一個tomcat配置兩個域名,每個域名對應一個專案
由於專案的需要,我們公司有兩個域名,每個域名對應一個專案,這樣我們就可以使用不同的域名訪問不同的專案了, 我們的網站使用的是web伺服器Tomcat,框架是自己封裝的簡易版jsp-servlet,域名
hadoop劃分為多個輸出檔案
現在我們見到的MapReduce作業的輸出都是一組檔案,那如果我想輸出多組檔案怎麼辦,比如說我想統計每個國家的專利情況,想以國家名作為檔名來輸出。我們可以使用MultipleOutputFormat,它內部有一個方法generateFileNameForKeyV
將一個檔案分割為多個檔案
實現效果: 知識運用: FileStream BinaryReader類 //用特定的編碼將基元資料型別讀作二進位制值 其建構函式主要使用UTF8Encoding初始化類例項 public BinaryReader(Stream input) B
Python將一個大檔案按段落分隔為多個小檔案的簡單方法
今天幫同學處理一點語料。語料檔案有點大,並且是以連續兩個換行符作為段落標誌,他想把它按段落分隔成多個小檔案,即每3個段落組成一個新檔案。由於以前沒有遇到過類似的操作,在網上找了一些相似的方法,看起來都有點複雜。所以經嘗試,自己寫了一段程式碼,完美解決問題。 基本思路是,先讀原檔案內容,
pdf檔案怎麼拆分頁面為多個pdf
雖然pdf檔案已經得到了越來越多的應用,但是還是有許多朋友對pdf檔案並不熟悉當。比如如何將pdf檔案頁面拆分,相信就有不少朋友不知道應該如何操作。不過沒有關係,閱讀這篇文章,大家就能夠掌握pdf檔案頁面拆分的方法了 1、在進行拆分pdf檔案頁面之前,我們需要準備一款能夠幫助我們拆分pdf檔案頁面的軟體
編寫裝飾器, 為多個函式加上認證功能(使用者的賬戶密碼來源於檔案, 使用者有三次登入的機會), # 要求, 如果使用者登入成功了, 後續就不需要再次登入了.
# flag = False # 一開始沒有登入## def login(): # 函式:對功能或者動作的封裝# global flag# username = input("請輸入你的使用者名稱:")# password = input("請輸入你的密碼:")## with
SQL 中拆分由逗號分割的欄位為多個欄位
測試準備建立表並填充資料USE [zws]GO/****** Object: Table [dbo].[a] Script Date: 2018/5/3 16:28:28 ******/SET ANSI_NULLS ONGOSET QUOTED_IDENTIFIER
Python(74)_編寫裝飾器,為多個函數加上記錄調用功能,要求每次調用函數都將被調用的函數名寫入文件
png span write nbsp tools log 刪除 ner turn #-*-coding:utf-8-*- import os import time from functools import wraps ‘‘‘ 1、編寫裝飾器,為多個函數加上記錄調用功
如何將一個PDF檔案頁面進行的拆分為多個PDF檔案
想要將PDF檔案進行拆分其實也算比較簡單的一件事,但是如果我們沒有一個好的工具的話,再簡單的是我們也操作不好,想要將PDF檔案進行頁面的拆分一般都是利用比較專業的PDF編輯器去對它進行操作,小編一般的候想要對
python——將大檔案切分為多個小檔案
切分檔案 最近遇到需要切分檔案的需求,當然首選用python來解決,網上搜了下感覺都太複雜了,其實用python自帶函式即可解決。 f = open('path&filename','r') #開啟檔案 i = 0 #設定計數器 while i&l
使用PDF分割器將PDF拆分為多個文件
利用PDF拆分軟體您可以將一個或多個文件拆分為多個更小的文件,當拆分文件時,當拆分文件時,可以指定根據最大頁面數量、最大檔案大小,或頂層書籤等進行拆分。操作方法如下: ①首先開啟pdf分割軟體,在顯示的視窗中點選“新增檔案”按鈕,開啟拆分文件,此時介面上就會顯示文字上
迅捷PDF分割器如何將PDF拆分為多個文件
最近小Q打算分享一個PDF格式的電子書,結果在釋出連結時,居然超出了150M只有註冊的會員才可以分享150M以上的文件或者軟體,哎,只怪小Q身無分文 ,無法升級成會員,所以只能將這個完整的電子書拆分成三份,特把拆分方法拿出來與大家分享: (1)網上教程講解很多,小編就來
如何將字符串分割賦值給多個shell變量
lock ext shell變量 宋體 for prev spa str -a 如何將字符串分割賦值給多個shell變量shellTarget Target 比如字符串"111|222|333"分割分別賦值給三個shell變量 $ a=‘111|222|333
Python將一個大文件按段落分隔為多個小文件的簡單方法
解決 list 之前 一點 open ews 切片 compile popu 今天幫同學處理一點語料。語料文件有點大,而且是以連續兩個換行符作為段落標誌,他想把它按段落分隔成多個小文件。即每3個段落組成一個新文件。因為曾經沒有遇到過類似的操作,在網上找了
Fast Flux技術——本質就是跳板,控制多個機器,同一域名指向極多的IP(TTL修改為0),以逃避追蹤
using sea 地址 3.2 網站 nslookup 不知道 維基百科 run 轉自:http://ytuwlg.iteye.com/blog/355718 通過病毒郵件和欺詐網站學到的對付網絡封鎖的好東西:Fast Flux技術 收到一封郵件,引起我的好奇了: 郵件標