自定義hadoop map/reduce輸入檔案切割InputFormat 更改輸入value的分隔符
本文轉載自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html
hadoop會對原始輸入檔案進行檔案切割,然後把每個split傳入mapper程式中進行處理,FileInputFormat是所有以檔案作 為資料來源的InputFormat實現的基類,FileInputFormat儲存作為job輸入的所有檔案,並實現了對輸入檔案計算splits的方 法。至於獲得記錄的方法是有不同的子類進行實現的。
那麼,FileInputFormat是怎樣將他們劃分成splits的呢?FileInputFormat只劃分比HDFS block大的檔案,所以如果一個檔案的大小比block小,將不會被劃分,這也是Hadoop處理大檔案的效率要比處理很多小檔案的效率高的原因。
hadoop預設的InputFormat是TextInputFormat,重寫了FileInputFormat中的createRecordReader和isSplitable方法。該類使用的reader是LineRecordReader,即以回車鍵(CR = 13)或換行符(LF = 10)為行分隔符。
start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } //這個按行讀取就是響應的回車及換行
但大多數情況下,回車鍵或換行符作為輸入檔案的行分隔符並不能滿足我們的需求,通常使用者很有可能會輸入回車鍵、換行符,所以通常我們會定義不可見字元(即使用者無法輸入的字元)為行分隔符,這種情況下,就需要新寫一個InputFormat。
又或者,一條記錄的分隔符不是字元,而是字串,這種情況相對麻煩;還有一種情況,輸入檔案的主鍵key已經是排好序的了,需要hadoop做的只是把相 同的key作為一個數據塊進行邏輯處理,這種情況更麻煩,相當於免去了mapper的過程,直接進去reduce,那麼InputFormat的邏輯就相 對較為複雜了,但並不是不能實現。
1、改變一條記錄的分隔符,不用預設的回車或換行符作為記錄分隔符,甚至可以採用字串作為記錄分隔符
1)自定義一個InputFormat,繼承FileInputFormat,重寫createRecordReader方法,如果不需要分片或者需要改變分片的方式,則重寫isSplitable方法,具體程式碼如下:
- publicclass FileInputFormatB extends FileInputFormat<LongWritable, Text> {
- @Override
- public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {
- <span style="color:#ff0000;">returnnew SearchRecordReader("\b");</span>
- }
- @Override
- protectedboolean isSplitable(FileSystem fs, Path filename) {
- <span style="color:#ff6666;"> // 輸入檔案不分片
- returnfalse;</span>
- }
- }
2)關鍵在於定義一個新的SearchRecordReader繼承RecordReader,支援自定義的行分隔符,即一條記錄的分隔符。標紅的地方為與hadoop預設的LineRecordReader不同的地方。
- private CompressionCodecFactory compressionCodecs = null;
- privatelong start;
- privatelong pos;
- privatelong end;
- private LineReader in;
- privateint maxLineLength;
- private LongWritable key = null;
- private Text value = null;
- <span style="color:#ff0000;"> //行分隔符,即一條記錄的分隔符
- privatebyte[] separator = {'\b'};
- privateint sepLength = 1;
- public IsearchRecordReader(){
- }
- public IsearchRecordReader(String seps){//賦值的分隔符
- this.separator = seps.getBytes();
- sepLength = separator.length;
- }</span>
- publicvoid initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
- FileSplit split = (FileSplit) genericSplit;
- Configuration job = context.getConfiguration();<span style="color:#ff6666;">//獲取對應的配置檔案</span>
- this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
- this.start = split.getStart();<span style="color:#ff6666;">//分片的起始位置</span>
- this.end = (this.start + split.getLength());<span style="color:#ff6666;">//split的結尾</span>
- Path file = split.getPath();<span style="color:#ff6666;">//獲得路徑</span>
- this.compressionCodecs = new CompressionCodecFactory(job);
- CompressionCodec codec = this.compressionCodecs.getCodec(file);
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath())<span style="color:#ff6666;">;//開啟分片資料</span>
- boolean skipFirstLine = false;
- if (codec != null) {
- this.in = new LineReader(codec.createInputStream(fileIn), job);
- this.end = Long.MAX_VALUE;
- } else {
- if (this.start != 0L) {
- skipFirstLine = true;
- <span style="color:#ff6666;"> this.start -= sepLength;//改變分隔符需要修改的地方</span>
- fileIn.seek(this.start);
- }
- this.in = new LineReader(fileIn, job);
- }
- if (skipFirstLine) { // skip first line and re-establish "start".
- int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
- if(newSize > 0){
- start += newSize;
- }
- }
- this.pos = this.start;
- }
- publicboolean nextKeyValue() throws IOException {
- if (this.key == null) {
- this.key = new LongWritable();
- }
- this.key.set(this.pos);
- if (this.value == null) {
- this.value = new Text();
- }
- int newSize = 0;
- while (this.pos < this.end) {
- newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
- (int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));
- if (newSize == 0) {
- break;
- }
- this.pos += newSize;
- if (newSize < this.maxLineLength) {
- break;
- }
- LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
- }
- if (newSize == 0) {
- <span style="color:#ff6666;">//讀下一個buffer</span>
- this.key = null;
- this.value = null;
- returnfalse;
- }
- //讀同一個buffer的下一個記錄
- returntrue;
- }
- public LongWritable getCurrentKey() {
- returnthis.key;
- }
- public Text getCurrentValue() {
- returnthis.value;
- }
- publicfloat getProgress() {
- if (this.start == this.end) {
- return0.0F;
- }
- return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
- }
- publicsynchronizedvoid close() throws IOException {
- if (this.in != null)
- this.in.close();
- }
- }
3)重寫SearchRecordReader需要的LineReader,可作為SearchRecordReader內部類。特別需要注意的地方就 是,讀取檔案的方式是按指定大小的buffer來讀,必定就會遇到一條完整的記錄被切成兩半,甚至如果分隔符大於1個字元時分隔符也會被切成兩半的情況, 這種情況一定要加以拼接處理。
- publicclass LineReader {
- //回車鍵(hadoop預設)
- //private static final byte CR = 13;
- //換行符(hadoop預設)
- //private static final byte LF = 10;
- //按buffer進行檔案讀取
- privatestaticfinalint DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;<span style="color:#ff6666;">//32M</span>
- privateint bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- privatebyte[] buffer;
- privateint bufferLength = 0;
- privateint bufferPosn = 0;
- LineReader(InputStream in, int bufferSize) {
- this.bufferLength = 0;
- this.bufferPosn = 0;
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = newbyte[this.bufferSize];
- }
- public LineReader(InputStream in, Configuration conf) throws IOException {
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
- }
- publicvoid close() throws IOException {
- in.close();
- }
- publicint readLine(Text str, int maxLineLength) throws IOException {
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
- }
- publicint readLine(Text str) throws IOException {
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
- <span style="color:#ff6666;">//以下是需要改寫的部分_start,核心程式碼</span>
- <span style="color:#ff6666;"> publicint readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{//map函式中使用到得readline</span>
- str.clear();
- Text record = new Text();
- int txtLength = 0;
- long bytesConsumed = 0L;
- boolean newline = false;
- int sepPosn = 0;
- do {
- //已經讀到buffer的末尾了,讀下一個buffer
- if (this.bufferPosn >= this.bufferLength) {
- bufferPosn = 0;
- bufferLength = in.read(buffer);
- <span style="color:#ff6666;"> //讀到檔案末尾了,則跳出,進行下一個檔案的讀取,</span><span style="color:#009900;">那這個時候算是新的splits嗎?</span>
- if (bufferLength <= 0) {
- break;
- }
- }
- int startPosn = this.bufferPosn;
- for (; bufferPosn < bufferLength; bufferPosn ++) {
- <span style="color:#ff6666;"> //處理上一個buffer的尾巴被切成了兩半的分隔符(如果分隔符中重複字元過多在這裡會有問題)</span>
- if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
- sepPosn = 0;
- }
- //遇到行分隔符的第一個字元
- if (buffer[bufferPosn] == separator[sepPosn]) {
- bufferPosn ++;
- int i = 0;
- //判斷接下來的字元是否也是行分隔符中的字元
- for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
- //buffer的最後剛好是分隔符,且分隔符被不幸地切成了兩半
- if(bufferPosn + i >= bufferLength){
- bufferPosn += i - 1;
- break;
- }
- //一旦其中有一個字元不相同,就判定為不是分隔符
- if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
- sepPosn = 0;
- break;
- }
- }
- //的確遇到了行分隔符
- if(sepPosn == sepLength){
- bufferPosn += i;
- newline = true;
- sepPosn = 0;
- break;
- }
- }
- }
- int readLength = this.bufferPosn - startPosn;
- bytesConsumed += readLength;
- //行分隔符不放入塊中
- //int appendLength = readLength - newlineLength;
- if (readLength > maxLineLength - txtLength) {
- readLength = maxLineLength - txtLength;
- }
- if (readLength > 0) {
- record.append(this.buffer, startPosn, readLength);
- txtLength += readLength;
- //去掉記錄的分隔符
- if(newline){
- str.set(record.getBytes(), 0, record.getLength() - sepLength);
- }
- }
- } while (!newline && (bytesConsumed < maxBytesToConsume));
- if (bytesConsumed > (long)Integer.MAX_VALUE) {
- thrownew IOException("Too many bytes before newline: " + bytesConsumed);
- }
- return (int) bytesConsumed;
- }
- //以下是需要改寫的部分_end
- //以下是hadoop-core中LineReader的原始碼_start
- publicint readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
- str.clear();
- int txtLength = 0;
- int newlineLength = 0;
- boolean prevCharCR = false;
- long bytesConsumed = 0L;
- do {
- int startPosn = this.bufferPosn;
- if (this.bufferPosn >= this.bufferLength) {
- startPosn = this.bufferPosn = 0;
- if (prevCharCR) bytesConsumed ++;
- this.bufferLength = this.in.read(this.buffer);
- if (this.bufferLength <= 0) break;
- }
- for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
- if (this.buffer[this.bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- this.bufferPosn ++;
- break;
- }
- if (prevCharCR) {
- newlineLength = 1;
- break;
- }
- prevCharCR = this.buffer[this.bufferPosn] == CR;
- }
- int readLength = this.bufferPosn - startPosn;
- if ((prevCharCR) && (newlineLength == 0))
- --readLength;
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.append(this.buffer, startPosn, appendLength);
- txtLength += appendLength; }
- }
- while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));
- if (bytesConsumed > (long)Integer.MAX_VALUE) thrownew IOException("Too many bytes before newline: " + bytesConsumed);
- return (int)bytesConsumed;
- }
- //以下是hadoop-core中LineReader的原始碼_end
- }
2、已經按主鍵key排好序了,並保證相同主鍵key一定是在一起的,假設每條記錄的第一個欄位為主鍵,那麼如 果沿用上面的LineReader,需要在核心方法readLine中對前後兩條記錄的id進行equals判斷,如果不同才進行split,如果相同繼 續下一條記錄的判斷。程式碼就不再貼了,但需要注意的地方,依舊是前後兩個buffer進行交接的時候,非常有可能一條記錄被切成了兩半,一半在前一個buffer中,一半在後一個buffer中。
這種方式的好處在於少去了reduce操作,會大大地提高效率,其實mapper的過程相當的快,費時的通常是reduce。