1. 程式人生 > >自定義hadoop map/reduce輸入檔案切割InputFormat 更改輸入value的分隔符

自定義hadoop map/reduce輸入檔案切割InputFormat 更改輸入value的分隔符

本文轉載自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html

hadoop會對原始輸入檔案進行檔案切割,然後把每個split傳入mapper程式中進行處理,FileInputFormat是所有以檔案作 為資料來源的InputFormat實現的基類,FileInputFormat儲存作為job輸入的所有檔案,並實現了對輸入檔案計算splits的方 法。至於獲得記錄的方法是有不同的子類進行實現的。

        那麼,FileInputFormat是怎樣將他們劃分成splits的呢?FileInputFormat只劃分比HDFS block大的檔案,所以如果一個檔案的大小比block小,將不會被劃分,這也是Hadoop處理大檔案的效率要比處理很多小檔案的效率高的原因。 

       hadoop預設的InputFormat是TextInputFormat,重寫了FileInputFormat中的createRecordReader和isSplitable方法。該類使用的reader是LineRecordReader,即以回車鍵(CR = 13)或換行符(LF = 10)為行分隔符。

    start += in.readLine(new Text(), 0,  
                                 (int)Math.min((long)Integer.MAX_VALUE, end - start));  
          } //這個按行讀取就是響應的回車及換行  

      但大多數情況下,回車鍵或換行符作為輸入檔案的行分隔符並不能滿足我們的需求,通常使用者很有可能會輸入回車鍵、換行符,所以通常我們會定義不可見字元(即使用者無法輸入的字元)為行分隔符,這種情況下,就需要新寫一個InputFormat

      又或者,一條記錄的分隔符不是字元,而是字串,這種情況相對麻煩;還有一種情況,輸入檔案的主鍵key已經是排好序的了,需要hadoop做的只是把相 同的key作為一個數據塊進行邏輯處理,這種情況更麻煩,相當於免去了mapper的過程,直接進去reduce,那麼InputFormat的邏輯就相 對較為複雜了,但並不是不能實現。

1、改變一條記錄的分隔符,不用預設的回車或換行符作為記錄分隔符,甚至可以採用字串作為記錄分隔符


     1)自定義一個InputFormat,繼承FileInputFormat,重寫createRecordReader方法,如果不需要分片或者需要改變分片的方式,則重寫isSplitable方法,具體程式碼如下:

  1. publicclass FileInputFormatB extends FileInputFormat<LongWritable, Text> {  
  2.    @Override
  3.    public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {  
  4.         <span style="color:#ff0000;">returnnew SearchRecordReader("\b");</span>  
  5.     }  
  6.     @Override
  7.     protectedboolean isSplitable(FileSystem fs, Path filename) {  
  8.         <span style="color:#ff6666;"// 輸入檔案不分片
  9.         returnfalse;</span>  
  10.      }  
  11. }  
這裡的“\b“應該就是指定的分隔符,並要求不對單個檔案進行分片處理。
2)關鍵在於定義一個新的SearchRecordReader繼承RecordReader,支援自定義的行分隔符,即一條記錄的分隔符。標紅的地方為與hadoop預設的LineRecordReader不同的地方。
  1. private CompressionCodecFactory compressionCodecs = null;  
  2.  privatelong start;  
  3.  privatelong pos;  
  4.  privatelong end;  
  5.  private LineReader in;  
  6.  privateint maxLineLength;  
  7.  private LongWritable key = null;  
  8.  private Text value = null;  
  9. <span style="color:#ff0000;"//行分隔符,即一條記錄的分隔符
  10.  privatebyte[] separator = {'\b'};  
  11.  privateint sepLength = 1;  
  12. ‍ public IsearchRecordReader(){  
  13.  }  
  14.  public IsearchRecordReader(String seps){//賦值的分隔符
  15.   this.separator = seps.getBytes();   
  16.   sepLength = separator.length;  
  17.  }</span>  
  18.  publicvoid initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {  
  19.   FileSplit split = (FileSplit) genericSplit;  
  20.   Configuration job = context.getConfiguration();<span style="color:#ff6666;">//獲取對應的配置檔案</span>
  21.   this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);  
  22.   this.start = split.getStart();<span style="color:#ff6666;">//分片的起始位置</span>
  23.   this.end = (this.start + split.getLength());<span style="color:#ff6666;">//split的結尾</span>
  24.   Path file = split.getPath();<span style="color:#ff6666;">//獲得路徑</span>
  25.   this.compressionCodecs = new CompressionCodecFactory(job);  
  26.   CompressionCodec codec = this.compressionCodecs.getCodec(file);  
  27.   // open the file and seek to the start of the split
  28.   FileSystem fs = file.getFileSystem(job);  
  29.   FSDataInputStream fileIn = fs.open(split.getPath())<span style="color:#ff6666;">;//開啟分片資料</span>
  30.   boolean skipFirstLine = false;  
  31.   if (codec != null) {  
  32.    this.in = new LineReader(codec.createInputStream(fileIn), job);  
  33.    this.end = Long.MAX_VALUE;  
  34.   } else {  
  35.    if (this.start != 0L) {  
  36.     skipFirstLine = true;  
  37.    <span style="color:#ff6666;"this.start -= sepLength;//改變分隔符需要修改的地方</span>
  38.     fileIn.seek(this.start);  
  39.    }  
  40.    this.in = new LineReader(fileIn, job);  
  41.   }  
  42.   if (skipFirstLine) { // skip first line and re-establish "start".
  43.    int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));  
  44.    if(newSize > 0){  
  45.     start += newSize;  
  46.    }  
  47.   }  
  48.   this.pos = this.start;  
  49.  }  
  50.  publicboolean nextKeyValue() throws IOException {  
  51.   if (this.key == null) {  
  52.    this.key = new LongWritable();  
  53.   }  
  54.   this.key.set(this.pos);  
  55.   if (this.value == null) {  
  56.    this.value = new Text();  
  57.   }  
  58.   int newSize = 0;  
  59.   while (this.pos < this.end) {  
  60.    newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(  
  61.  (int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));  
  62.    if (newSize == 0) {  
  63.     break;  
  64.    }  
  65.    this.pos += newSize;  
  66.    if (newSize < this.maxLineLength) {  
  67.     break;  
  68.    }  
  69.    LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));  
  70.   }  
  71.   if (newSize == 0) {  
  72.    <span style="color:#ff6666;">//讀下一個buffer</span>
  73.    this.key = null;  
  74.    this.value = null;  
  75.    returnfalse;  
  76.   }  
  77.   //讀同一個buffer的下一個記錄
  78.   returntrue;  
  79.  }  
  80.  public LongWritable getCurrentKey() {  
  81.   returnthis.key;  
  82.  }  
  83.  public Text getCurrentValue() {  
  84.   returnthis.value;  
  85.  }  
  86.  publicfloat getProgress() {  
  87.   if (this.start == this.end) {  
  88.    return0.0F;  
  89.   }  
  90.   return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));  
  91.  }  
  92.  publicsynchronizedvoid close() throws IOException {  
  93.   if (this.in != null)  
  94.    this.in.close();  
  95.  }  
  96. }  

 3)重寫SearchRecordReader需要的LineReader,可作為SearchRecordReader內部類。特別需要注意的地方就 是,讀取檔案的方式是按指定大小的buffer來讀,必定就會遇到一條完整的記錄被切成兩半,甚至如果分隔符大於1個字元時分隔符也會被切成兩半的情況, 這種情況一定要加以拼接處理。
  1. publicclass LineReader {  
  2.   //回車鍵(hadoop預設)
  3.   //private static final byte CR = 13;
  4.   //換行符(hadoop預設)
  5.   //private static final byte LF = 10;
  6.   //按buffer進行檔案讀取
  7.   privatestaticfinalint DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;<span style="color:#ff6666;">//32M</span>  
  8.   privateint bufferSize = DEFAULT_BUFFER_SIZE;  
  9.   private InputStream in;  
  10.   privatebyte[] buffer;  
  11.   privateint bufferLength = 0;  
  12.   privateint bufferPosn = 0;  
  13.   LineReader(InputStream in, int bufferSize) {  
  14.    this.bufferLength = 0;  
  15.     this.bufferPosn = 0;  
  16.    this.in = in;  
  17.    this.bufferSize = bufferSize;  
  18.    this.buffer = newbyte[this.bufferSize];  
  19.   }  
  20.   public LineReader(InputStream in, Configuration conf) throws IOException {  
  21.    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));  
  22.   }  
  23.   publicvoid close() throws IOException {  
  24.    in.close();  
  25.   }  
  26.  publicint readLine(Text str, int maxLineLength) throws IOException {  
  27.    return readLine(str, maxLineLength, Integer.MAX_VALUE);  
  28.   }  
  29.   publicint readLine(Text str) throws IOException {  
  30.    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);  
  31.   }  
  32.   <span style="color:#ff6666;">//以下是需要改寫的部分_start,核心程式碼</span>
  33. <span style="color:#ff6666;">  publicint readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{//map函式中使用到得readline</span>
  34.    str.clear();  
  35.    Text record = new Text();  
  36.    int txtLength = 0;  
  37.    long bytesConsumed = 0L;  
  38.    boolean newline = false;  
  39.    int sepPosn = 0;  
  40.    do {  
  41.     //已經讀到buffer的末尾了,讀下一個buffer
  42.     if (this.bufferPosn >= this.bufferLength) {  
  43.      bufferPosn = 0;  
  44.      bufferLength = in.read(buffer);  
  45.     <span style="color:#ff6666;"//讀到檔案末尾了,則跳出,進行下一個檔案的讀取,</span><span style="color:#009900;">那這個時候算是新的splits嗎?</span>
  46.      if (bufferLength <= 0) {  
  47.       break;  
  48.      }  
  49.     }  
  50.     int startPosn = this.bufferPosn;  
  51.     for (; bufferPosn < bufferLength; bufferPosn ++) {  
  52.     <span style="color:#ff6666;"//處理上一個buffer的尾巴被切成了兩半的分隔符(如果分隔符中重複字元過多在這裡會有問題)</span>
  53.      if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){  
  54.       sepPosn = 0;  
  55.      }  
  56.      //遇到行分隔符的第一個字元
  57.      if (buffer[bufferPosn] == separator[sepPosn]) {  
  58.       bufferPosn ++;  
  59.       int i = 0;  
  60.       //判斷接下來的字元是否也是行分隔符中的字元
  61.       for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){  
  62.        //buffer的最後剛好是分隔符,且分隔符被不幸地切成了兩半
  63.        if(bufferPosn + i >= bufferLength){  
  64.         bufferPosn += i - 1;  
  65.         break;  
  66.        }  
  67.        //一旦其中有一個字元不相同,就判定為不是分隔符
  68.        if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){  
  69.         sepPosn = 0;  
  70.         break;  
  71.        }  
  72.       }  
  73.       //的確遇到了行分隔符
  74.       if(sepPosn == sepLength){  
  75.        bufferPosn += i;  
  76.        newline = true;  
  77.        sepPosn = 0;  
  78.        break;  
  79.       }  
  80.      }  
  81.     }  
  82.     int readLength = this.bufferPosn - startPosn;  
  83.     bytesConsumed += readLength;  
  84.     //行分隔符不放入塊中
  85.     //int appendLength = readLength - newlineLength;
  86.     if (readLength > maxLineLength - txtLength) {  
  87.      readLength = maxLineLength - txtLength;  
  88.     }  
  89.     if (readLength > 0) {  
  90.      record.append(this.buffer, startPosn, readLength);  
  91.      txtLength += readLength;  
  92.      //去掉記錄的分隔符
  93.      if(newline){  
  94.       str.set(record.getBytes(), 0, record.getLength() - sepLength);  
  95.      }  
  96.     }  
  97.    } while (!newline && (bytesConsumed < maxBytesToConsume));  
  98.    if (bytesConsumed > (long)Integer.MAX_VALUE) {  
  99.     thrownew IOException("Too many bytes before newline: " + bytesConsumed);  
  100.    }  
  101.    return (int) bytesConsumed;  
  102.   }  
  103.   //以下是需要改寫的部分_end
  104. //以下是hadoop-core中LineReader的原始碼_start
  105. publicint readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{  
  106.     str.clear();  
  107.     int txtLength = 0;  
  108.     int newlineLength = 0;  
  109.     boolean prevCharCR = false;  
  110.     long bytesConsumed = 0L;  
  111.     do {  
  112.       int startPosn = this.bufferPosn;  
  113.       if (this.bufferPosn >= this.bufferLength) {  
  114.         startPosn = this.bufferPosn = 0;  
  115.         if (prevCharCR)  bytesConsumed ++;  
  116.         this.bufferLength = this.in.read(this.buffer);  
  117.         if (this.bufferLength <= 0)  break;  
  118.       }  
  119.       for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {  
  120.         if (this.buffer[this.bufferPosn] == LF) {  
  121.           newlineLength = (prevCharCR) ? 2 : 1;  
  122.           this.bufferPosn ++;  
  123.           break;  
  124.         }  
  125.         if (prevCharCR) {  
  126.           newlineLength = 1;  
  127.           break;  
  128.         }  
  129.         prevCharCR = this.buffer[this.bufferPosn] == CR;  
  130.       }  
  131.       int readLength = this.bufferPosn - startPosn;  
  132.       if ((prevCharCR) && (newlineLength == 0))  
  133.         --readLength;  
  134.       bytesConsumed += readLength;  
  135.       int appendLength = readLength - newlineLength;  
  136.       if (appendLength > maxLineLength - txtLength) {  
  137.         appendLength = maxLineLength - txtLength;  
  138.       }  
  139.       if (appendLength > 0) {  
  140.         str.append(this.buffer, startPosn, appendLength);  
  141.         txtLength += appendLength; }  
  142.     }  
  143.     while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));  
  144.     if (bytesConsumed > (long)Integer.MAX_VALUE) thrownew IOException("Too many bytes before newline: " + bytesConsumed);  
  145.     return (int)bytesConsumed;  
  146.   }  
  147. //以下是hadoop-core中LineReader的原始碼_end
  148. }  

2、已經按主鍵key排好序了,並保證相同主鍵key一定是在一起的,假設每條記錄的第一個欄位為主鍵,那麼如 果沿用上面的LineReader,需要在核心方法readLine中對前後兩條記錄的id進行equals判斷,如果不同才進行split,如果相同繼 續下一條記錄的判斷。程式碼就不再貼了,但需要注意的地方,依舊是前後兩個buffer進行交接的時候,非常有可能一條記錄被切成了兩半,一半在前一個buffer中,一半在後一個buffer中。

     這種方式的好處在於少去了reduce操作,會大大地提高效率,其實mapper的過程相當的快,費時的通常是reduce。