1. 程式人生 > >hadoop怎麼分割寫入的檔案為多個塊的,一個map對應一個split分片嗎?split與block的關係

hadoop怎麼分割寫入的檔案為多個塊的,一個map對應一個split分片嗎?split與block的關係

1,在介紹hadoop寫檔案的時候我們經常會說首先分割檔案為多個塊;那麼是怎麼分割的呢?

這裡其實不要有過的糾結,這裡的塊是block,是hdfs中切塊的大小,屬於物理劃分,預設64M,在hadoop-default.xml配置中有體現:

<property>  
  <name>dfs.block.size</name>  
  <value>67108864</value>  
  <description>The default block size for new files.</description>  
</property>  

當然如果檔案沒有64M也不會佔據整塊空間。

將檔案分割成多個塊後,形成一個數據佇列,然後依次寫入datanode列表。

再者,如果寫入的是個資料夾,而且每個檔案的都不大,這樣在hdfs中是預設每個檔案一個塊的,即使沒有64m,當然也可做優化處理,不過hbase更便利於處理把小檔案合併到一個塊中,這個我會在其他博文中介紹。

2,下面我們說說split,並與block的關係

首先,split是mapreduce中的概念,而block是hdfs中切塊的大小。

如下:

//設定要處理的文字資料所存放的路徑  
        FileInputFormat.setInputPaths(wordCountJob, "hdfs://ubuntu:9000/input/aa.txt");  
        FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://ubuntu:9000/output/"));  

我們設定要處理的檔案路徑時都會用到fileInputFormat類, 不過我們更多看到的是inputFormat,其實fileInputFormat這個類的也是實現inputFomat介面,

下面我們接著看原始碼,說明為什麼需要分片?

以hadoop自帶的wordCount的原始碼為例:

for (int i = 0; i < otherArgs.length - 1; ++i) {  
  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  
}  
FileOutputFormat.setOutputPath(job,  
  new Path(otherArgs[otherArgs.length - 1]));  
System.exit(job.waitForCompletion(true) ? 0 : 1);  

我們看到使用的InputFormat是FileOutputFormat,任務執行呼叫了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的程式碼如下:

public boolean waitForCompletion(boolean verbose  
                                 ) throws IOException, InterruptedException,  
                                          ClassNotFoundException {  
  if (state == JobState.DEFINE) {  
    submit();  
  }  
  // 省略本文不關心的程式碼  
  return isSuccessful();  
}  

這裡的submit方法的實現如下:

public void submit()   
         throws IOException, InterruptedException, ClassNotFoundException {  
    // 省略本文不關心的程式碼</span>  
    final JobSubmitter submitter =   
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {  
      public JobStatus run() throws IOException, InterruptedException,   
      ClassNotFoundException {  
        return submitter.submitJobInternal(Job.this, cluster);  
      }  
    });  
    state = JobState.RUNNING;  
    LOG.info("The url to track the job: " + getTrackingURL());  
   }  

submit方法首先建立了JobSubmitter例項,然後非同步呼叫了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有關劃分任務的程式碼如下:

// Create the splits for the job  
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
int maps = writeSplits(job, submitJobDir);  
conf.setInt(MRJobConfig.NUM_MAPS, maps);  
LOG.info("number of splits:" + maps); 

writeSplits方法的實現如下:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,  
    Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  JobConf jConf = (JobConf)job.getConfiguration();  
  int maps;  
  if (jConf.getUseNewMapper()) {  
    maps = writeNewSplits(job, jobSubmitDir);  
  } else {  
    maps = writeOldSplits(jConf, jobSubmitDir);  
  }  
  return maps;  
}  

由於WordCount使用的是新的mapreduce API,所以最終會呼叫writeNewSplits方法。writeNewSplits的實現如下:

private <T extends InputSplit>  
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  Configuration conf = job.getConfiguration();  
  InputFormat<?, ?> input =  
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  
  
  List<InputSplit> splits = input.getSplits(job);  
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);  
  
  // sort the splits into order based on size, so that the biggest  
  // go first  
  Arrays.sort(array, new SplitComparator());  
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,   
      jobSubmitDir.getFileSystem(conf), array);  
  return array.length;  
}  

writeNewSplits方法中,劃分任務數量最關鍵的程式碼即為InputFormat的getSplits方法(提示:大家可以直接通過此處的呼叫,檢視不同InputFormat的劃分任務實現)。根據前面的分析我們知道此時的InputFormat即為FileOutputFormat,其getSplits方法的實現如下:

public List<InputSplit> getSplits(JobContext job) throws IOException {  
  Stopwatch sw = new Stopwatch().start();  
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  long maxSize = getMaxSplitSize(job);  
  
  // generate splits  
  List<InputSplit> splits = new ArrayList<InputSplit>();  
  List<FileStatus> files = listStatus(job);  
  for (FileStatus file: files) {  
    Path path = file.getPath();  
    long length = file.getLen();  
    if (length != 0) {  
      BlockLocation[] blkLocations;  
      if (file instanceof LocatedFileStatus) {  
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
      } else {  
        FileSystem fs = path.getFileSystem(job.getConfiguration());  
        blkLocations = fs.getFileBlockLocations(file, 0, length);  
      }  
      if (isSplitable(job, path)) {  
        long blockSize = file.getBlockSize();  
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  
        long bytesRemaining = length;  
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                      blkLocations[blkIndex].getHosts(),  
                      blkLocations[blkIndex].getCachedHosts()));  
          bytesRemaining -= splitSize;  
        }  
  
        if (bytesRemaining != 0) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
                     blkLocations[blkIndex].getHosts(),  
                     blkLocations[blkIndex].getCachedHosts()));  
        }  
      } else { // not splitable  
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),  
                    blkLocations[0].getCachedHosts()));  
      }  
    } else {   
      //Create empty hosts array for zero length files  
      splits.add(makeSplit(path, 0, length, new String[0]));  
    }  
  }  
  // Save the number of input files for metrics/loadgen  
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  sw.stop();  
  if (LOG.isDebugEnabled()) {  
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
        + ", TimeTaken: " + sw.elapsedMillis());  
  }  
  return splits;  
}  

totalSize:是整個Map-Reduce job所有輸入的總大小。

numSplits:來自job.getNumMapTasks(),即在job啟動時用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設定的值,給M-R框架的Map數量的提示。

goalSize:是輸入總大小與提示Map task數量的比值,即期望每個Mapper處理多少的資料,僅僅是期望,具體處理的資料數由下面的computeSplitSize決定。

minSplitSize:預設為1,可由子類複寫函式protected void setMinSplitSize(long minSplitSize) 重新設定。一般情況下,都為1,特殊情況除外

minSize:取的1和mapred.min.split.size中較大的一個。

blockSize:HDFS的塊大小,預設為64M,一般大的HDFS都設定成128M。

splitSize:就是最終每個Split的大小,那麼Map的數量基本上就是totalSize/splitSize。

接下來看看computeSplitSize的邏輯:首先在goalSize(期望每個Mapper處理的資料量)和HDFS的block size中取較小的,然後與mapred.min.split.size相比取較大的

  一個片為一個splits,即一個map,只要搞清楚片的大小,就能計算出執行時的map數。而一個split的大小是由goalSize, minSize, blockSize這三個值決定的。computeSplitSize的邏輯是,先從goalSize和blockSize兩個值中選出最小的那個(比如一般不設定map數,這時blockSize為當前檔案的塊size,而goalSize是檔案大小除以使用者設定的map數得到的,如果沒設定的話,預設是1),在預設的大多數情況下,blockSize比較小。然後再取blockSize和minSize中最大的那個。而minSize如果不通過”mapred.min.split.size”設定的話(”mapred.min.split.size”預設為0),minSize為1,可理解為一個block塊,這樣得出的一個splits的size就是blockSize,即一個塊一個map,有多少塊就有多少map。

split的大小時預設和hdfs的block塊大小一致,但是可以通過配置檔案自己設定: 
其中有倆個配置檔案(如下):

--minsize   預設大小為1
mapreduce.input.fileinputformat.split.minsize  

--maxsize   預設大小為Long.MAXValue 
mapreduce.input.fileinputformat.split.maxsize

舉例:

比如說我問寫入一個資料夾,裡面有10個只有幾k的檔案,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小為10。在遍歷files列表的過程中,會獲取每個檔案的blockSize,最終呼叫computeSplitSize方法計算每個輸入檔案應當劃分的任務數。computeSplitSize方法的實現如下:

protected long computeSplitSize(long blockSize, long minSize,  
                                long maxSize) {  
  return Math.max(minSize, Math.min(maxSize, blockSize));  
}  

然後根據

  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  

很明顯取split預設值,也就是一個塊,那麼10個就要分為10塊,這也說明為什麼處理小檔案時,block的大小小於split的 大小。同時我們看到了一個split分配一個map任務。

這裡我們可以總結下split大小與block的關係:

(1)block塊的小於split分片的最小值,那split的值就是split分片的大小

(2)block塊的小大介於split分片配置的最小值和最大值之間,block的大小就是split的大小。

(3)block塊的大小大於split分片的最大值,split的大小就是split配置的最大值。但會增加map執行的併發度,但是會造成在節點之間拉取資料

也有公式可以計算split也就是map任務數,這裡就不做討論了。

一個map對應一個split分片嗎?

經過上面的討論,答案是顯而易見的:

map個數:由任務切片spilt決定的,預設情況下一個split的大小就是block
由參與任務的檔案個數決定的 

相關推薦

hadoop怎麼分割寫入檔案一個map對應一個split分片splitblock關係

1,在介紹hadoop寫檔案的時候我們經常會說首先分割檔案為多個塊;那麼是怎麼分割的呢?這裡其實不要有過的糾結,這裡的塊是block,是hdfs中切塊的大小,屬於物理劃分,預設64M,在hadoop-default.xml配置中有體現:<property>

Hadoop 檢視某個檔案分成幾分別在那臺機架的哪個機器上

hadoop fsck /usr/input/a.txt -files -blocks -locations -racks [[email protected] sbin]# hadoop

博士問如何分割一個檔案檔案

參考:http://www.cnblogs.com/waynechen/archive/2010/07/26/1785097.html split -l 300 large_file.txt new_file_prefix 解決  (adsbygoogle = windo

涉及Fragment點選其中一個fragment中的“點選登入”按鈕跳轉到“登入”介面時出現程式閃退現象

問題說明:我想實現從當前fragment_me中點選一下登入按鈕,跳轉到登入介面,所以我在MeFragment.java裡的onCreate()方法中為TextView控制元件新增監聽事件。但在實際執行中,我點選“Me”之後,會出現閃退現象。 錯誤:空指標異常 出現閃

一個jar包裡有main指定執行某一個main

如果一個jar中含有多個主程式,而你沒有配置預設主程式,或者想要執行指定主程式,則可以通過如下命令執行:java -cp example03-1.0-SNAPSHOT.jar com.alan.HelloWorld-cp <目錄和 zip/jar 檔案的類搜尋路徑>

linux系統一個tomcat配置兩域名每個域名對應一個專案

由於專案的需要,我們公司有兩個域名,每個域名對應一個專案,這樣我們就可以使用不同的域名訪問不同的專案了, 我們的網站使用的是web伺服器Tomcat,框架是自己封裝的簡易版jsp-servlet,域名

hadoop劃分輸出檔案

現在我們見到的MapReduce作業的輸出都是一組檔案,那如果我想輸出多組檔案怎麼辦,比如說我想統計每個國家的專利情況,想以國家名作為檔名來輸出。我們可以使用MultipleOutputFormat,它內部有一個方法generateFileNameForKeyV

一個檔案分割檔案

實現效果:    知識運用:   FileStream    BinaryReader類   //用特定的編碼將基元資料型別讀作二進位制值   其建構函式主要使用UTF8Encoding初始化類例項    public BinaryReader(Stream input)       B

Python將一個檔案按段落分隔檔案的簡單方法

今天幫同學處理一點語料。語料檔案有點大,並且是以連續兩個換行符作為段落標誌,他想把它按段落分隔成多個小檔案,即每3個段落組成一個新檔案。由於以前沒有遇到過類似的操作,在網上找了一些相似的方法,看起來都有點複雜。所以經嘗試,自己寫了一段程式碼,完美解決問題。 基本思路是,先讀原檔案內容,

pdf檔案怎麼拆分頁面pdf

  雖然pdf檔案已經得到了越來越多的應用,但是還是有許多朋友對pdf檔案並不熟悉當。比如如何將pdf檔案頁面拆分,相信就有不少朋友不知道應該如何操作。不過沒有關係,閱讀這篇文章,大家就能夠掌握pdf檔案頁面拆分的方法了  1、在進行拆分pdf檔案頁面之前,我們需要準備一款能夠幫助我們拆分pdf檔案頁面的軟體

編寫裝飾器, 函式加上認證功能(使用者的賬戶密碼來源於檔案, 使用者有三次登入的機會), # 要求, 如果使用者登入成功了, 後續就不需要再次登入了.

# flag = False # 一開始沒有登入## def login(): # 函式:對功能或者動作的封裝# global flag# username = input("請輸入你的使用者名稱:")# password = input("請輸入你的密碼:")## with

SQL 中拆分由逗號分割的欄位欄位

測試準備建立表並填充資料USE [zws]GO/****** Object:  Table [dbo].[a]    Script Date: 2018/5/3 16:28:28 ******/SET ANSI_NULLS ONGOSET QUOTED_IDENTIFIER

Python(74)_編寫裝飾器函數加上記錄調用功能要求每次調用函數都將被調用的函數名寫入文件

png span write nbsp tools log 刪除 ner turn #-*-coding:utf-8-*- import os import time from functools import wraps ‘‘‘ 1、編寫裝飾器,為多個函數加上記錄調用功

如何將一個PDF檔案頁面進行的拆分PDF檔案

     想要將PDF檔案進行拆分其實也算比較簡單的一件事,但是如果我們沒有一個好的工具的話,再簡單的是我們也操作不好,想要將PDF檔案進行頁面的拆分一般都是利用比較專業的PDF編輯器去對它進行操作,小編一般的候想要對

python——將大檔案切分檔案

切分檔案 最近遇到需要切分檔案的需求,當然首選用python來解決,網上搜了下感覺都太複雜了,其實用python自帶函式即可解決。 f = open('path&filename','r') #開啟檔案 i = 0 #設定計數器 while i&l

使用PDF分割器將PDF拆分文件

  利用PDF拆分軟體您可以將一個或多個文件拆分為多個更小的文件,當拆分文件時,當拆分文件時,可以指定根據最大頁面數量、最大檔案大小,或頂層書籤等進行拆分。操作方法如下:  ①首先開啟pdf分割軟體,在顯示的視窗中點選“新增檔案”按鈕,開啟拆分文件,此時介面上就會顯示文字上

迅捷PDF分割器如何將PDF拆分文件

最近小Q打算分享一個PDF格式的電子書,結果在釋出連結時,居然超出了150M只有註冊的會員才可以分享150M以上的文件或者軟體,哎,只怪小Q身無分文 ,無法升級成會員,所以只能將這個完整的電子書拆分成三份,特把拆分方法拿出來與大家分享:   (1)網上教程講解很多,小編就來

如何將字符串分割賦值給shell變量

lock ext shell變量 宋體 for prev spa str -a 如何將字符串分割賦值給多個shell變量shellTarget Target 比如字符串"111|222|333"分割分別賦值給三個shell變量 $ a=‘111|222|333

Python將一個大文件按段落分隔小文件的簡單方法

解決 list 之前 一點 open ews 切片 compile popu 今天幫同學處理一點語料。語料文件有點大,而且是以連續兩個換行符作為段落標誌,他想把它按段落分隔成多個小文件。即每3個段落組成一個新文件。因為曾經沒有遇到過類似的操作,在網上找了

Fast Flux技術——本質就是跳板控制機器同一域名指向極的IP(TTL修改0)以逃避追蹤

using sea 地址 3.2 網站 nslookup 不知道 維基百科 run 轉自:http://ytuwlg.iteye.com/blog/355718 通過病毒郵件和欺詐網站學到的對付網絡封鎖的好東西:Fast Flux技術 收到一封郵件,引起我的好奇了: 郵件標