1. 程式人生 > >Hadoop 學習研究(十): 自定義輸入輸出

Hadoop 學習研究(十): 自定義輸入輸出

自定義輸入輸出應用:在對資料需要進行一定條件的過濾和簡單處理的時候可以使用自定義輸入檔案格式類。

hadoop內建的輸入檔案格式類有:

1)FileInputFormat<K,V>這個是基本的父類,我們自定義就直接使用它作為父類;

2)TextInputFormat<LongWritable,Text>這個是預設的資料格式類,我們一般程式設計,如果沒有特別指定的話,一般都使用的是這個;key代表當前行資料距離檔案開始的距離,value程式碼當前行字串;

3)SequenceFileInputFormat<K,V>這個是序列檔案輸入格式,使用序列檔案可以提高效率,但是不利於檢視結果,建議在過程中使用序列檔案,最後展示可以使用視覺化輸出;

4)KeyValueTextInputFormat<Text,Text>這個是讀取以Tab(也即是\t)分隔的資料,每行資料如果以\t分隔,那麼使用這個讀入,就可以自動把\t前面的當做key,後面的當做value;

5)CombineFileInputFormat<K,V>合併大量小資料是使用;

6)MultipleInputs,多種輸入,可以為每個輸入指定邏輯處理的Mapper;

原理:

InputFormat介面有兩個重要的函式:

1)getInputSplits,用於確定輸入分片,當我們繼承FileInputFormat時,就可以忽略此函式,而使用FileInputFormat的此函式即可;

2)createRecordReader ,針對資料如何讀取的類,定義輸入檔案格式,其實也就是定義此類;

在每個map函式中,最開始呼叫的都是nextKeyValue()函式,這個函式就是在RecordReader中定義的(我們自定義RecordReader就是使用不同的實現而已),所以這裡會呼叫我們指定的RecordReader中的nextKeyValue函式。這個函式就會處理或者說是初始化key和value,然後返回true,告知已經處理好了。接著就會呼叫getCurrentKey 和getCurrentValue獲取當前的key和value值。最後,返回map,繼續執行map邏輯。

自定義輸入檔案格式類:

  1. package fz.inputformat;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.io.Text;  
  4. import org.apache.hadoop.mapreduce.InputSplit;  
  5. import org.apache.hadoop.mapreduce.RecordReader;  
  6. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  8. /** 
  9.  * 自定義輸入檔案讀取類 
  10.  *  
  11.  * @author fansy 
  12.  * 
  13.  */
  14. publicclass CustomInputFormat extends FileInputFormat<Text, Text> {  
  15.     @Override
  16.     public RecordReader<Text, Text> createRecordReader(InputSplit split,  
  17.             TaskAttemptContext context) throws IOException,  
  18.             InterruptedException {  
  19.         // TODO Auto-generated method stub
  20.         returnnew CustomReader();  
  21.     }  
  22. }  
這裡看到如果繼承了FileInputFormat 後,就不需要關心getInputSplits了,而只需要定義RecordReader即可。

自定義RecordReader

  1. package fz.inputformat;  
  2. //import java.io.BufferedReader;
  3. import java.io.IOException;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.fs.FSDataInputStream;  
  6. import org.apache.hadoop.fs.FileSystem;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.InputSplit;  
  10. import org.apache.hadoop.mapreduce.RecordReader;  
  11. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  13. import org.apache.hadoop.util.LineReader;  
  14. publicclass CustomReader extends RecordReader<Text ,Text>{  
  15. //  private BufferedReader in;
  16.     private LineReader lr ;  
  17.     private Text key = new Text();  
  18.     private Text value = new Text();  
  19.     privatelong start ;  
  20.     privatelong end;  
  21.     privatelong currentPos;  
  22.     private Text line = new Text();  
  23.     @Override
  24.     publicvoid initialize(InputSplit inputSplit, TaskAttemptContext cxt)  
  25.             throws IOException, InterruptedException {  
  26.         FileSplit split =(FileSplit) inputSplit;  
  27.         Configuration conf = cxt.getConfiguration();  
  28.         Path path = split.getPath();  
  29.         FileSystem fs = path.getFileSystem(conf);  
  30.         FSDataInputStream is = fs.open(path);  
  31.         lr = new LineReader(is,conf);  
  32.         // 處理起始點和終止點
  33.         start =split.getStart();  
  34.         end = start + split.getLength();  
  35.         is.seek(start);  
  36.         if(start!=0){  
  37.             start += lr.readLine(new Text(),0,  
  38.                     (int)Math.min(Integer.MAX_VALUE, end-start));  
  39.         }  
  40.         currentPos = start;  
  41.     }  
  42.     // 針對每行資料進行處理
  43.     @Override
  44.     publicboolean nextKeyValue() throws IOException, InterruptedException {  
  45.         if(currentPos > end){  
  46.             returnfalse;  
  47.         }  
  48.         currentPos += lr.readLine(line);  
  49.         if(line.getLength()==0){  
  50.             returnfalse;  
  51.         }  
  52.         if(line.toString().startsWith("ignore")){  
  53.             currentPos += lr.readLine(line);  
  54.         }  
  55.         String [] words = line.toString().split(",");  
  56.         // 異常處理
  57.         if(words.length<2){  
  58.             System.err.println("line:"+line.toString()+".");  
  59.             returnfalse;  
  60.         }  
  61.         key.set(words[0]);  
  62.         value.set(words[1]);  
  63.         returntrue;  
  64.     }  
  65.     @Override
  66.     public Text getCurrentKey() throws IOException, InterruptedException {  
  67.         return key;  
  68.     }  
  69.     @Override
  70.     public Text getCurrentValue() throws IOException, InterruptedException {  
  71.         return value;  
  72.     }  
  73.     @Override
  74.     publicfloat getProgress() throws IOException, InterruptedException {  
  75.         if (start == end) {  
  76.             return0.0f;  
  77.         } else {  
  78.             return Math.min(1.0f, (currentPos - start) / (float) (end - start));  
  79.         }  
  80.     }  
  81.     @Override
  82.     publicvoid close() throws IOException {  
  83.         // TODO Auto-generated method stub
  84.         lr.close();  
  85.     }  
  86. }  
這裡主要是兩個函式,initial和nextKeyValue。

initial主要用於初始化,包括開啟和讀取檔案,定義讀取的進度等;

nextKeyValue則是針對每行資料(由於這裡使用的是LineReader,所以每次讀取的是一行,這裡定義不同的讀取方式,可以讀取不同的內容),產生對應的key和value對,如果沒有報錯,則返回true。這裡可以看到設定了一條規則,如果輸入資料是以ignore開始的話就忽略,同時每行只取得逗號前後的資料分別作為key和value。

實戰:

輸入資料:

  1. ignore,2  
  2. a,3  
  3. ignore,4  
  4. c,1  
  5. c,2,3,2  
  6. 4,3,2  
  7. ignore,34,2  
定義主類,主類的Mapper是預設的Mapper,沒有reducer。
  1. package fz.inputformat;  
  2. import org.apache.hadoop.conf.Configuration;  
  3. import org.apache.hadoop.conf.Configured;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.Text;  
  6. 相關推薦

    Hadoop 學習研究(): 定義輸入輸出

    自定義輸入輸出應用:在對資料需要進行一定條件的過濾和簡單處理的時候可以使用自定義輸入檔案格式類。 hadoop內建的輸入檔案格式類有: 1)FileInputFormat<K,V>這個是基本的父類,我們自定義就直接使用它作為父類; 2)TextInputForm

    Shell 腳本學習筆記:Shell輸入輸出重定向

    描述符 獨立 文件重定向 eof 輸出 合並 包含 span 輸出重定向 command > file 將輸出重定向到 file。 command < file 將輸入重定向到 file。 command >> file

    Hadoop學習筆記—5.定義類型處理手機上網日誌

    clas stat 基本 手機上網 oop interrupt pil 依然 手機號碼 一、測試數據:手機上網日誌 1.1 關於這個日誌   假設我們如下一個日誌文件,這個文件的內容是來自某個電信運營商的手機上網日誌,文件的內容已經經過了優化,格式比較規整,便於學習研究。

    Hadoop學習筆記—5.定義型別處理手機上網日誌

    一、測試資料:手機上網日誌 1.1 關於這個日誌   假設我們如下一個日誌檔案,這個檔案的內容是來自某個電信運營商的手機上網日誌,檔案的內容已經經過了優化,格式比較規整,便於學習研究。   該檔案的內容如下(這裡我只截取了三行): 1363157993044 18211575961 94-71-

    hadoop深入研究:(二)——定義Writable

    自定義Writablehadoop雖然已經實現了一些非常有用的Writable,而且你可以使用他們的組合做很多事情,但是如果你想構造一些更加複雜的結果,你可以自定義Writable來達到你的目的,我們以註釋的方式對自定義Writable進行講解(不許說我只帖程式碼佔篇幅哦,姿

    Hadoop實戰-Flume之定義Sink(九)

    current ioe back urn oop print out java try import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream;

    hadoop程式設計小技巧(5)---定義輸入檔案格式類InputFormat

    Hadoop程式碼測試環境:Hadoop2.4應用:在對資料需要進行一定條件的過濾和簡單處理的時候可以使用自定義輸入檔案格式類。Hadoop內建的輸入檔案格式類有:1)FileInputFormat<K,V>這個是基本的父類,我們自定義就直接使用它作為父類;2)T

    大數據學習定義輸入 13

    table 輸入 配置信息 tst float 分享 ado 自定義 throws 一:自定義輸出 需求:將多個小文件合並為SequenceFile(存儲了多個小文件) 存儲格式:文件路徑+文件的內容 c:/a.txt i am hunter henshu

    python學習筆記5-定義函數

    函數調用 筆記 取值 修改 args pytho class 名稱 func 1 自定義函數   (1)函數代碼塊以def關鍵字開頭,然後函數標識符名稱和圓括號   (2)任何傳入參數和自變量必須放在圓括號中間。圓括號之間可以用於定義參數   (3)函數的第一行語句可以選擇

    學習筆記19_定義錯誤頁

    默認 errors acc .html 調試 error redirect nbsp edi 在WebConfig中,可以配置當服務器發生哪些錯誤時,能跳轉到那個頁面: <customErrors mode ="On" defaultRedirect = " defa

    轉:C#制作ORM映射學習筆記一 定義Attribute類

    技術 sage 其中 username pac ont 學習 collect reat 之前在做unity項目時發現只能用odbc連接數據庫,感覺非常的麻煩,因為之前做web開發的時候用慣了ORM映射,所以我想在unity中也用一下ORM(雖然我知道出於性能的考慮這樣做事不

    vue2.0學習筆記之定義組件

    2.0 sco ron 自定義組件 定義 temp use 使用 imp step one: 推薦結構 step two: Loading.vue <template> <h3>{{msg}}<

    MapReduce實戰:定義輸入格式實現成績管理

    stat app 註意 false exce 考試成績 fileinput collect 劃分 1. 項目需求   我們取有一份學生五門課程的期末考試成績數據,現在我們希望統計每個學生的總成績和平均成績。 樣本數據如下所示,每行數據的數據格式為:學號、

    dubbo源碼學習(二) : spring 定義標簽

    java dubbo spring 自定義標簽 做dubbo的配置時很容易發現,dubbo有一套自己的標簽,提供給開發者配置,其實每一個標簽對應著一個 實體,在容器啟動的時候,dubbo會對所有的配置進行解析然後將解析後的內容設置到實體裏,最終dubbo會根據實體中的值生成貫穿全局的統一URL

    ASP.NET MVC 學習筆記-7.定義配置信息(後續)

    字符串 return abstract 新的 work 生成 value DC 連接字符串加密 自定義配置信息的高級應用 通過上篇博文對簡單的自定義配置信息的學習,使得更加靈活的控制系統配置信息。實際項目中,這種配置的靈活度往往無法滿足項目的靈活度和擴展性。 比如,一個

    node學習筆記6——定義模塊

    例子 學習筆記 log 2個 模塊 而且 nodejs 說明 分享 自定義模塊三大關鍵詞: require——引入模塊; exports——單個輸出; module——批量輸出。 從例子下手: 1.創建module.js: exports.a=22; exports.

    <C++學習一>關於C++輸入輸出

    cin.get 技術分享 測試結果 分享 line 遇到 關於 分享圖片 .com 本質:輸入輸出時以字節流進行抽取字節    ·cin 輸入流,遇到空白、制表等,輸入會停止。     重點把握的函數:     (1)cin.get(); //從流中讀取一個字符     (

    性能測試Jmeter擴展學習-添加定義函數

    默認 coder 技術分享 runtime rfi charset ons default 性能測試   我們在使用jmeter的時候有時候會碰到jmeter現有插件或功能也無法支持的場景,比如前端加密,此時我們就需要自己手動編寫函數並導入了,下面就是手動修改並導入的過程。

    Django學習手冊 - 初識定義分頁

    request shortcut ren 字符 span turn 翻頁 info char 核心: <a href=‘http://127.0.0.1:8000/index-%s‘>%s<a> 自定義分頁 1.前端處理字符   後端的字符

    Django 中使用 logging 配置 logger 定義日誌輸出

    講解 handler ati dmi ase set file 過程 require 在使用 django 開發過程中,默認的日誌輸出是不能滿足我們去排查問題的,往往需要自定義的日誌輸出,幫助我們去排查程序BUG,定位問題原因。 在使用 django 的開發過程中,我使用的