Hadoop自定義linereader,實現按行分塊
最近想用Hadoop實現一個A的轉置乘以A的矩陣運算,假設A是100w*100的矩陣,想把100w行特徵分成100個map,每個map處理1w行,每個map一次性處理1w行,而不是一行一行處理。
hadoop0.21.0這個版本已經實現了NLineInputFormat這個方法
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, in);
NLineInputFormat.setNumLinesPerSplit(job,10000);
這樣可以實現每個map處理10000行的需求,100萬行就是100個map,而不是預設的按物理塊大小分配map,但是這個介面內部呼叫map方法的時候,仍是一行一行處理的,map方法會被執行10000次,現在改寫linereader,讓map方法只執行一次,每次處理10000行。
首先定義myLineInputFormat類,將job的讀入方式設成myLineInputFormat。
job.setInputFormatClass(myLineInputFormat.class);
myLineInputFormat.addInputPath(job, in);
myLineInputFormat.setNumLinesPerSplit(job,10000);
myLineInputFormat的原始碼直接拷貝NLineInputFormat的原始碼,myLineInputFormat這個類裡面的RecordReader返回的LongWritable, Text返回的是每個map方法裡的檔案偏移量和文字內容,每次讀10000行,偏移量就是10000行的偏移量,text自然是10000行的文字內容。
重寫myLineInputFormat類裡的RecordReader方法:
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new mylinereader();
}
定義myLineReader類,先原始碼拷貝LineRecordReader類,getFilePosition方法就是返回的每次文字的偏移量,每次的偏移量根據LineReader的readLine方法來確定,所以myLineReader的LineReader引用自己的LineReader,把import的類包改一下就成。
定義LineReader類,放在自定義的package裡面,原始碼拷貝原先hadoop裡的LineReader類,修改readLine方法:
if(appendLength > 0)
appendLength = (appendLength+1) * 10000 ;
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
第一個if是修改檔案的長度,自己新增,第二個if是原來方法裡的,如果10000行內容太多,這裡append會報陣列越界,所以修改LineReader的構造方法,如下,DEFAULT_BUFFER_SIZE是類屬性,值也可以自己修改
public LineReader(InputStream in, Configuration conf) throws IOException {
//this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
this(in, DEFAULT_BUFFER_SIZE);
}
同時修改readLine方法的返回值,原始是返回一行的偏移量,現在改成返回10000行的偏移量
return (int)bytesConsumed * 10000 ;