1. 程式人生 > >Hadoop小檔案處理

Hadoop小檔案處理

導讀

        HDFS作為Hadoop生態系統的分散式檔案系統,設計是用來儲存海量資料,特別適合儲存TB、PB量級別的資料。但是隨著時間的推移或者處理程式的問題,HDFS上可能會存在大量的小檔案,進而消耗NameNode大量的記憶體,並且延長程式的執行時間。下面我就把對小檔案的處理經驗總結一下,供大家參考。

引言

       先來了解一下Hadoop中何為小檔案:小檔案指的是那些檔案大小要比HDFS的塊大小(在Hadoop1.x的時候預設塊大小64MB,可以通過dfs.blocksize來設定;但是到了Hadoop 2.x的時候預設塊大小為128MB了,可以通過dfs.block.size設定)小的多的檔案。而HDFS的問題在於無法很有效的處理大量小檔案。在HDFS中,任何一個檔案、目錄和block,在HDFS中都會被表示為一個object儲存在Namenode的記憶體中,每一個object佔用150 bytes的記憶體空間。所以,如果有10million個檔案,每一個檔案對應一個block,那麼就將要消耗Namenode 3G的記憶體來儲存這些block的資訊。如果規模再大一些,那麼將會超出現階段計算機硬體所能滿足的極限。不僅如此,HDFS並不是為了有效的處理大量小檔案而存在的。它主要是為了流式的訪問大檔案而設計的。對小檔案的讀取通常會造成大量從Datanode到Datanode的seeks和hopping來retrieve檔案,而這樣是非常的低效的一種訪問方式。

一、概述

HDFS儲存特點:
(1)流式讀取方式,主要是針對一次寫入,多次讀出的使用模式。寫入的過程使用的是append的方式。
(2)設計目的是為了儲存超大檔案,主要是針對幾百MB,GB,TB,甚至PB的檔案。
(3)該分散式系統構建在普通PC機組成的叢集上,大大降低了構建成本,並遮蔽了系統故障,使得使用者可以專注於自身的操作運算。
(4)HDFS適用於高吞吐量,而不適合低時間延遲的訪問。如果同時存入1million的files,那麼HDFS 將花費幾個小時的時間。
(5)流式讀取的方式,不適合多使用者寫入,以及任意位置寫入。如果訪問小檔案,則必須從一個Datanode跳轉到另外一個Datanode,這樣大大降低了讀取效能。

二、HDFS檔案操作流程

HDFS體系結構

    HDFS採用master/slave架構。一個HDFS叢集是由一個Namenode和一定數目的Datanodes組成。Namenode是一箇中心伺服器,負責管理檔案系統的名字空間(namespace)以及客戶端對檔案的訪問。叢集中的Datanode一般是一個節點一個,負責管理它所在節點上的儲存。HDFS暴露了檔案系統的名字空間,使用者能夠以檔案的形式在上面儲存資料。從內部看,一個檔案其實被分成一個或多個數據塊,這些塊儲存在一組Datanode上。Namenode執行檔案系統的名字空間操作,比如開啟、關閉、重新命名檔案或目錄。它也負責確定資料塊到具體Datanode節點的對映。Datanode負責處理檔案系統客戶端的讀寫請求。在Namenode的統一排程下進行資料塊的建立、刪除和複製。

HDFS檔案的讀取

 

(1)client端傳送讀檔案請求給Namenode,如果檔案不存在,返回錯誤資訊,否則,將該檔案對應的block及其所在Datanode位置傳送給client

(2)client收到檔案位置資訊後,與不同Datanode建立socket連線並行獲取資料。

HDFS檔案的寫入

 

(1) client端傳送寫檔案請求,Namenode檢查檔案是否存在,如果已存在,直接返回錯誤資訊,否則,傳送給client一些可用Datanode節點

(2)client將檔案分塊,並行儲存到不同節點上Datanode上,傳送完成後,client同時傳送資訊給Namenode和Datanode

(3)Namenode收到的client資訊後,傳送確信資訊給Datanode

(4)Datanode同時收到Namenode和Datanode的確認資訊後,提交寫操作。

三、HDFS小檔案解決方案

1、 HDFS上的小檔案問題

   現象:在現在的叢集上已經存在了大量的小檔案和目錄。

   方案:檔案是許多記錄(Records)組成的,那麼可以通過呼叫HDFS的sync()方法和append方法結合使用,每隔一定時間生成一個大檔案。或者可以通過寫一個程式來來合併這些小檔案。

2、 MapReduce上的小檔案問題

   現象:

   Map任務(task)一般一次處理一個塊大小的輸入(input)(預設使用FileInputFormat)。如果檔案非常小,並且擁有大量的這種小檔案,那麼每一個map task都僅僅處理非常小的input資料,因此會產生大量的map tasks,每一個map task都會額外增加bookkeeping開銷。一個1GB的檔案,拆分成16個塊大小檔案(預設block size為64M),相對於拆分成10000個100KB的小檔案,後者每一個小檔案啟動一個map task,那麼job的時間將會十倍甚至百倍慢於前者。 

  方案:

     I、Hadoop Archive:

     Haddop Archive是一個高效地將小檔案放入HDFS塊中的檔案存檔工具,它能夠將多個小檔案打包成一個HAR檔案,這樣同時在減少Namenode的記憶體使用。
     II、Sequence file:
     sequence file由一系列的二進位制key/value組成。key為小檔名,value為檔案內容,可以將大批小檔案合併成一個大檔案。

     I、II  這裡不做介紹可以參考(http://blog.cloudera.com/blog/2009/02/the-small-files-problem

     III、CombineFileInputFormat:

      Hadoop內建提供了一個 CombineFileInputFormat 類來專門處理小檔案,其核心思想是:根據一定的規則,將HDFS上多個小檔案合併到一個 InputSplit中,然後會啟用一個Map來處理這裡面的檔案,以此減少MR整體作業的執行時間。CombineFileInputFormat類繼承自FileInputFormat,主要重寫了List  getSplits(JobContext job)方法;這個方法會根據資料的分佈,mapreduce.input.fileinputformat.split.minsize.per.node、mapreduce.input.fileinputformat.split.minsize.per.rack以及mapreduce.input.fileinputformat.split.maxsize 引數的設定來合併小檔案,並生成List。其中mapreduce.input.fileinputformat.split.maxsize引數至關重要,如果使用者沒有設定這個引數(預設就是沒設定),那麼同一個機架上的所有小檔案將組成一個InputSplit,最終由一個Map Task來處理。如果使用者設定了這個引數,那麼同一個節點(node)上的檔案將會組成一個InputSplit。同一個 InputSplit 包含了多個HDFS塊檔案,這些資訊儲存在 CombineFileSplit 類中,它主要包含以下資訊:

 摺疊原始碼

1

2

3

4

5

private Path[] paths;

private long[] startoffset;

private long[] lengths;

private String[] locations;

private long totLength;

       從上面的定義可以看出,CombineFileSplit類包含了每個塊檔案的路徑、起始偏移量、相對於原始偏移量的大小以及這個檔案的儲存節點。因為一個CombineFileSplit包含了多個小檔案,所以需要使用陣列來儲存這些資訊。CombineFileInputFormat是抽象類,如果我們要使用它,需要實現createRecordReader方法,告訴MR程式如何讀取組合的InputSplit。內建實現了兩種用於解析組合InputSplit的類:org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat 和 org.apache.hadoop.mapreduce.lib.input.CombineSequenceFileInputFormat,我們可以把這兩個類理解是 TextInputFormat 和 SequenceFileInputFormat。為了簡便,這裡主要來介紹CombineTextInputFormat。

       在 CombineTextInputFormat 中建立了 org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader,具體如何解析CombineFileSplit中的檔案主要在CombineFileRecordReader中實現。CombineFileRecordReader類中其實封裝了 TextInputFormat的RecordReader,並對CombineFileSplit中的多個檔案迴圈遍歷並讀取其中的內容,初始化每個檔案的RecordReader主要在initNextRecordReader裡面實現;每次初始化新檔案的RecordReader都會設定mapreduce.map.input.file、mapreduce.map.input.length以及mapreduce.map.input.start引數,這樣我們可以在Map程式裡面獲取到當前正在處理哪個檔案。

樣例程式碼如下:

 摺疊原始碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

package com.analysys.test;

 

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.MRJobConfig;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

 

public class TestCombineInput extends Configured implements Tool {

    private static final Log LOG = LogFactory.getLog(TestCombineInput.class);

    private static final long ONE_MB = 1024 * 1024L;

 

    static class TextFileMapper extends Mapper<LongWritable , Text, Text, Text> {

        @Override

        protected void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            Configuration configuration = context.getConfiguration();

            LOG.info("MAP_INPUT_FILE    " + configuration.get(MRJobConfig.MAP_INPUT_FILE));

            Text filenameKey = new Text(configuration.get(MRJobConfig.MAP_INPUT_FILE));

            context.write(filenameKey, value);

        }

    }

 

    public static void main(String[] args) throws Exception {

        int exitCode = ToolRunner.run(new TestCombineInput(), args);

        System.exit(exitCode);

    }

 

    @Override

    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration(getConf());

        // 設定splitFile檔案大小

        conf.setLong("mapreduce.input.fileinputformat.split.maxsize", ONE_MB * 32);

        Job job = Job.getInstance(conf);

        FileInputFormat.setInputPaths(job, args[0]);

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setJarByClass(TestCombineInput.class);

        job.setInputFormatClass(CombineTextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        job.setMapperClass(TextFileMapper.class);

        return job.waitForCompletion(true) ? 0 1;

    }

}

日誌輸出:

 摺疊原始碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

18/09/21 15:05:23 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...

18/09/21 15:05:23 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]

18/09/21 15:05:24 INFO input.FileInputFormat: Total input paths to process : 152

18/09/21 15:05:24 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 57, size left: 651281

18/09/21 15:05:24 INFO mapreduce.JobSubmitter: number of splits:1

18/09/21 15:05:32 INFO mapreduce.Job:  map 0% reduce 0%

18/09/21 15:05:42 INFO mapreduce.Job:  map 100% reduce 0%

18/09/21 15:05:48 INFO mapreduce.Job:  map 100% reduce 100%

18/09/21 15:05:49 INFO mapreduce.Job: Counters: 49

    File System Counters

        FILE: Number of bytes read=7177482

        FILE: Number of bytes written=14656921

        FILE: Number of read operations=0

        FILE: Number of large read operations=0

        FILE: Number of write operations=0

        HDFS: Number of bytes read=667468

        HDFS: Number of bytes written=7163032

        HDFS: Number of read operations=157

        HDFS: Number of large read operations=0

        HDFS: Number of write operations=2

    Job Counters

        Launched map tasks=1

        Launched reduce tasks=1

        Other local map tasks=1

        Total time spent by all maps in occupied slots (ms)=61968

        Total time spent by all reduces in occupied slots (ms)=72656

        Total time spent by all map tasks (ms)=7746

        Total time spent by all reduce tasks (ms)=4541

        Total vcore-milliseconds taken by all map tasks=7746

        Total vcore-milliseconds taken by all reduce tasks=4541

        Total megabyte-milliseconds taken by all map tasks=63455232

        Total megabyte-milliseconds taken by all reduce tasks=74399744

    Map-Reduce Framework

        Map input records=2407

        Map output records=2407

        Map output bytes=7167847

        Map output materialized bytes=7177482

        Input split bytes=16187

        Combine input records=0

        Combine output records=0

        Reduce input groups=152

        Reduce shuffle bytes=7177482

        Reduce input records=2407

        Reduce output records=2407

        Spilled Records=4814

        Shuffled Maps =1

        Failed Shuffles=0

        Merged Map outputs=1

        GC time elapsed (ms)=614

        CPU time spent (ms)=18970

        Physical memory (bytes) snapshot=2913640448

        Virtual memory (bytes) snapshot=25466003456

        Total committed heap usage (bytes)=3401056256

    File Input Format Counters

        Bytes Read=0

    File Output Format Counters

        Bytes Written=7163032

    可以從日誌中很清楚的看出input檔案為Total input paths to process : 152,通過 CombineFileInputFormat處理後splits為 mapreduce.JobSubmitter: number of splits:1,Map數為 Launched map tasks=1。注意體會mapreduce.input.fileinputformat.split.maxsize引數的設定,大家可以不設定這個引數並且和設定這個引數執行情況對比,觀察Map Task的個數變化。

3、Hive上的小檔案問題

    現象1:

    輸入檔案過多,而Hive對檔案建立的總數是有限制的,這個限制取決於引數:hive.exec.max.created.files,預設值是10000。如果現在你的表有60個分割槽,然後你總共有2000個map,在執行的時候,每一個mapper都會建立60個檔案,對應著每一個分割槽,所以60*2000> 120000,就會報錯:exceeds 100000.Killing the job 。最簡單的解決辦法就是調大hive.exec.max.created.files引數。但是如果說資料檔案只有400G,那麼你調整這個引數比如說40000。平均下來也就差不多每一個檔案10.24MB,這樣的話就有40000多個小檔案,不是一件很好的事情。 

    方案1:

    設定 mapper 輸入檔案合併引數 

 摺疊原始碼

1

2

3

4

5

6

7

8

-- mapper執行前進行小檔案的合併

hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

-- 每一個mapper最大的輸入大小

mapred.max.split.size = 256000000

-- 一個節點上split的至少的大小(這個值決定了多個DataNode上的檔案是否需要合併)

mapred.min.split.size.per.node = 100000000

-- 一個機架下split的至少的大小(這個值決定了該機架下的檔案是否需要合併)

mapred.min.split.size.per.rack = 100000000

    現象2:

    hive執行中間過程生成的檔案過多 

    方案2:

    設定中間過程合併引數 

 摺疊原始碼

1

2

3

4

5

6

7

8

-- 在Map-only的任務結束時合併小檔案

hive.merge.mapfiles = true

-- 在Map-Reduce的任務結束時合併小檔案

hive.merge.mapredfiles = true

-- 合併檔案的大小

hive.merge.size.per.task = 25610001000

-- 當輸出檔案的平均大小小於該值時,啟動一個獨立的map-reduce任務進行檔案merge

hive.merge.smallfiles.avgsize=16000000

     現象3:

     hive結果檔案過多 

     方案3:

     設定 reducer 引數 (一種是調整reducer個數,另一種是調整reducer大小

 摺疊原始碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

-- 設定reduce的個數

set mapred.reduce.tasks=50;

insert into table xxx

select

  *

from

 xxx

distribute by rand();

-- 設定reduce的大小

set hive.exec.reducers.bytes.per.reducer=5120000000;

insert into table xxx

select

  *

from

 xxx

distribute by rand();

-- distribute by rand()保證了reduce中的資料隨機分配,大小大致相同

 

            

參考文章:

    1、https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html

    2、https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

    3、http://blog.cloudera.com/blog/2009/02/the-small-files-problem/