1. 程式人生 > >MapReduce系列之MapReduce的輸入

MapReduce系列之MapReduce的輸入

檔案是 MapReduce 任務資料的初始儲存地。正常情況下,輸入檔案一般是儲存在 HDFS 裡面。這些檔案的格式可以是任意的:我們可以使用基於行的日誌檔案, 也可以使用二進位制格式,多行輸入記錄或者其它一些格式。這些檔案一般會很大,達到數十GB,甚至更大。那麼 MapReduce 是如何讀取這些資料的呢?下面我們來學習 InputFormat 介面

1、InputFormat介面

InputFormat介面決定了輸入檔案如何被 Hadoop分塊(split up)與接受。InputFormat 能夠從一個 job 中得到一個 split 集合(InputSplit[]),然後再為這個 split 集合配上一個合適的 RecordReader(getRecordReader)來讀取每個split中的資料。 下面我們來看一下 InputFormat 介面由哪些抽象方法組成

2、InputFormat的抽象類方法

InputFormat 包含兩個抽象方法,如下所示

public abstract class InputFormat< K, V> {

 

    public abstract List<InputSplit> getSplits(JobContext context) throws IOException,InterruptedException;

 

    public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;

}

 

1)getSplits(JobContext context) 方 法負責將一個大資料邏輯分成許多片。比如資料庫表有 100 條資料,按照主鍵ID升序儲存。 假設每20條分成一片,這個List的大小就是5,然後每個InputSplit記錄兩個引數,第一個為這個分片的起始 ID,第二個為這個分片資料的大小,這裡是20.很明顯 InputSplit 並沒有真正儲存資料。只是提供了一個如何將資料分片的方法。

2)createRecordReader(InputSplit split,TaskAttemptContext context)方 法根據 InputSplit 定義的方法,返回一個能夠讀取分片記錄的 RecordReader。getSplit 用來獲取由輸入檔案計算出來的 InputSplit, 後面會看到計算 InputSplit 時,會考慮輸入檔案是否可分割、檔案儲存時分塊的大小和檔案大小等因素;而createRecordReader() 提供了前面說的 RecordReader 的實現, 將Key-Value 對從 InputSplit 中正確讀出來,比如LineRecordReader,它是以偏移值為Key,每行的資料為 Value,這使所有 createRecordReader() 返回 LineRecordReader 的 InputFormat 都是以偏移值為Key,每行資料為 Value 的形式讀取輸入分片的。

  其實很多時候並不需要我們實現 InputFormat 來讀取資料,Hadoop 自帶有很多資料輸入格式,已經實現了 InputFormat介面

3、InputFormat介面實現類

InputFormat 介面實現類有很多,其層次結構如下圖所示

1、FileInputFormat

FileInputFormat是所有使用檔案作為其資料來源的 InputFormat 實現的基類,它的主要作用是指出作業的輸入檔案位置。因為作業的輸入被設定為一組路徑, 這對指定作業輸入提供了很強的靈活性。FileInputFormat 提供了四種靜態方法來設定 Job 的輸入路徑:

public static void addInputPath(Job job,Path path);

public static void addInputPaths(Job job,String commaSeparatedPaths);

public static void setInputPaths(Job job,Path... inputPaths);

public static void setInputPaths(Job job,String commaSeparatedPaths);

addInputPath()、addInputPaths()方法可以將一 個或多個路徑加入路徑列表,可以分別呼叫這兩種方法來建立路徑列表;setInputPaths()方法一次設定完整的路徑列表,替換前面呼叫中在 Job 上所設定的所有路徑。它們具體的使用方法,看如下示例

// 設定一個源路徑

FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/input/inputPath1"));

// 設定多個源路徑,多個源路徑之間用逗號分開

FileInputFormat.addInputPaths(job,"hdfs://master:9000/input/inputPath1, hdfs://master:9000/input/inputPath2,...");

// inputPaths是一個Path型別的陣列,可以包含多個源路徑,比如hdfs://ljc:9000/input/inputPath1,hdfs://ljc:9000/input/inputPath2,     FileInputFormat.setInputPaths(job, inputPaths);

// 設定多個源路徑,多個源路徑之間用逗號分開

FileInputFormat.setInputPaths(job, " hdfs://master:9000/input/inputPath1, hdfs://master:9000/input/inputPath2,...");

add 方法、set 方法允許指定包含的檔案。如果需要排除特定檔案,可以使用 FileInputFormat 的 setInputPathFilter()方法設定一個過濾器

 

public static void setInputPathFilter(Job job,Class<? extends PathFilter filter);

 

關於過濾器,這裡不再深入討論。即使不設定過濾 器,FileInputFormat 也會使用一個預設的過濾器來排除隱藏檔案。 如果通過呼叫 setInputPathFilter()設定了過濾器,它會在預設過濾器的基礎上進行過濾。換句話說,自定義的過濾器只能看到非隱藏檔案

對於輸入的資料來源是檔案型別的情況下,Hadoop 不僅擅長處理非結構化文字資料,而且可以處理二進位制格式的資料, 但它們的基類都是FileInputFormat。下面我們介紹的幾種常用輸入格式,都實現了FileInputFormat基類

1、TextInputFormat

TextInputFormat 是預設的 InputFormat。每條記錄是一行輸入。鍵是LongWritable 型別,儲存該行在整個檔案中的位元組偏移量。 值是這行的內容,不包括任何行終止符(換行符、回車符),它被打包成一個 Text 物件。

    比如,一個分片包含了如下5條文字記錄,記錄之間使用tab(水平製表符)分割

1    22

2    17

3    17

4    11

5    11

    每條記錄表示為以下鍵/值對:

(0, 1    22)

(5, 2    17)

(10,3    17)

(15,4    11)

(20,5    11)

  很明顯,鍵並不是行號。一般情況下,很難取得行號,因為檔案按位元組而不是按行切分為分片。

2、KeyValueTextInputFormat

  每一行均為一條記錄, 被分隔符(預設是tab(\t))分割為key(Text),value(Text)。可以通過 mapreduce.input.keyvaluelinerecordreader.key.value,separator屬性(或者舊版本 API 中的 key.value.separator.in.input.line)來設定分隔符。 它的預設值是一個製表符。

    比如,一個分片包含了如下5條文字記錄,記錄之間使用tab(水平製表符)分割。

1    22

2    17

3    17

4    11

5    11

    每條記錄表示為以下鍵/值對:

(1,22)

(2,17)

(3,17)

(4,11)

(5,11)

  此時的鍵是每行排在製表符之前的 Text 序列。

3、NLineInputFormat

  通過 TextInputFormat 和 KeyValueTextInputFormat,每個 Mapper 收到的輸入行數不同。行數取決於輸入分片的大小和行的長度。 如果希望 Mapper 收到固定行數的輸入,需要將 NLineInputFormat 作為 InputFormat。與 TextInputFormat 一樣, 鍵是檔案中行的位元組偏移量,值是行本身。N 是每個 Mapper 收到的輸入行數。N 設定為1(預設值)時,每個 Mapper 正好收到一行輸入。 mapreduce.input.lineinputformat.linespermap 屬性(在舊版本 API 中的 mapred.line.input.format.linespermap 屬性)實現 N 值的設定。

    以下是一個示例,仍然以上面的4行輸入為例。

1    22

2    17

3    17

4    11

5    11

    例如,如果 N 是3,則每個輸入分片包含三行。一個 mapper 收到三行鍵值對:

1    22

2    17

3    17

   另一個 mapper 則收到後兩行(因為總共才5行,所有另一個mapper只能收到兩行)

4    11

5    11

   這裡的鍵和值與 TextInputFormat 生成的一樣。

  4、SequenceFileInputFormat

  用於讀取 sequence file。鍵和值由使用者定義。序列檔案為 Hadoop專用的壓縮二進位制檔案格式。它專用於一個 MapReduce作業和其它 MapReduce作業之間的傳送資料(適用與多個 MapReduce 連結操作)。

2、多個輸入

  雖然一個 MapReduce 作業的輸入可以包含多個輸入檔案,但所有檔案都由同一個 InputFormat 和 同一個 Mapper 來解釋。 然而,資料格式往往會隨時間演變,所以必須寫自己的 Mapper 來處理應用中的遺留資料格式問題。或者,有些資料來源會提供相同的資料, 但是格式不同。

  這些問題可以使用 MultipleInputs 類來妥善處理,它允許為每條輸入路徑指定 InputFormat 和 Mapper。例如,我們想把英國 Met Office 的氣象站資料和 NCDC 的氣象站資料放在一起來統計平均氣溫,則可以按照下面的方式來設定輸入路徑。

MultipleInputs.addInputPath(job,ncdcInputPath,TextInputFormat.class,NCDCTemperatureMapper.class);

MultipleInputs.addInputPath(job,metofficeInputPath,TextInputFormat.class,MetofficeTemperatureMapper.class);

 

這段程式碼取代了對 FileInputFormat.addInputPath()和job.setMapperClass() 的常規呼叫。Met Office 和 NCDC 的資料都是文字檔案,所以對兩者都使用 TextInputFormat 資料型別。 但這兩個資料來源的行格式不同,所以我們使用了兩個不一樣的 Mapper,分別為NCDCTemperatureMapper和MetofficeTemperatureMapper。重要的是兩個 Mapper 的輸出型別一樣,因此,reducer 看到的是聚集後的 map 輸出,並不知道這些輸入是由不同的 Mapper 產生的。

MultipleInputs 類還有一個過載版本的 addInputPath() 方法,它沒有 Mapper引數。如果有多種輸入格式而只有一個 Mapper(通過 Job 的 setMapperClass()方法設定),這種方法很有用。其具體方法如下所示。

public static void addInputPath(Job job,Path path,class< ? extends InputFormat> inputFormatClass);

  一個案例:https://github.com/taowenjun/MapReduce/tree/master/cn/tao/multipleinputs

 

3、DBInputFormat

    這 種輸入格式用於使用 JDBC 從關係資料庫中讀取資料。因為它沒有任何共享能力,所以在訪問資料庫的時候必須非常小心,在資料庫中執行太多的 mapper 讀資料可能會使資料庫受不了。 正是由於這個原因,DBInputFormat 最好用於載入少量的資料集。與之相對應的輸出格式是DBOutputFormat,它適用於將作業輸出資料(中等規模的資料)轉存到資料庫

4、自定義 InputFormat

  有時候 Hadoop 自帶的輸入格式,並不能完全滿足業務的需求,所以需要我們根據實際情況自定義 InputFormat 類。而資料來源一般都是檔案資料,那麼自定義 InputFormat時繼承 FileInputFormat 類會更為方便,從而不必考慮如何分片等複雜操作。 自定義輸入格式我們分為以下幾步:

1、繼承 FileInputFormat 基類。

2、重寫 FileInputFormat 裡面的 isSplitable() 方法。

3、重寫 FileInputFormat 裡面的 createRecordReader()方法。

  按照上述步驟如何自定義輸入格式呢?下面我們通過一個示例加強理解。

  我們取有一份學生五門課程的期末考試成績資料,現在我們希望統計每個學生的總成績和平均成績。 樣本資料如下所示,每行資料的資料格式為:學號、姓名、語文成績、數學成績、英語成績、物理成績、化學成績

19020090040 秦心芯 123 131 100 95 100

19020090006 李磊 99 92 100 90 100

    。。。。。

  下面我們就編寫程式,實現自定義輸入並求出每個學生的總成績和平均成績。分為以下幾個步驟,這裡只給出步驟,程式碼見下

    第一步:為了便於每個學生學習成績的計算,這裡我們需要自定義一個 ScoreWritable類實現 WritableComparable 介面,將學生各門成績封裝起來

    第二步:自定義輸入格式 ScoreInputFormat類,首先繼承 FileInputFormat,然後分別重寫 isSplitable() 方法和 createRecordReader() 方法。 需要注意的是,重寫createRecordReader()方法,其實也就是重寫其返回的物件ScoreRecordReader。 ScoreRecordReader 類繼承 RecordReader,實現資料的讀取

    第三步:編寫 MapReduce 程式,統計學生總成績和平均成績。需要注意的是,上面我們自定義的輸入格式ScoreInputFormat,需要在 MapReduce 程式中做如下設定,job.setInputFormatClass(ScoreInputFormat.class);//設定自定義輸入格式

一個典型的例項: https://github.com/taowenjun/MapReduce/blob/master/cn/tao/wholefiles/WholeFileInputFormat.java