1. 程式人生 > 其它 >10.Mapreduce例項——MapReduce自定義輸入格式小

10.Mapreduce例項——MapReduce自定義輸入格式小

10Mapreduce例項——MapReduce自定義輸入格式

實驗原理

1.輸入格式:InputFormat類定義瞭如何分割和讀取輸入檔案,它提供有下面的幾個功能:

(1)選擇作為輸入的檔案或物件;

(2) 定義把檔案劃分到任務的InputSplits;

(3)為RecordReader讀取檔案提供了一個工廠方法;

Hadoop自帶了好幾個輸入格式。其中有一個抽象類叫FileInputFormat,所有操作檔案的InputFormat類都是從它那裡繼承功能和屬性。當開啟Hadoop作業時,FileInputFormat會得到一個路徑引數,這個路徑內包含了所需要處理的檔案,FileInputFormat會讀取這個資料夾內的所有檔案(譯註:預設不包括子資料夾內的),然後它會把這些檔案拆分成一個或多個的InputSplit。你可以通過JobConf物件的setInputFormat()方法來設定應用到你的作業輸入檔案上的輸入格式。下表給出了一些

預設的輸入格式是TextInputFormat,它把輸入檔案每一行作為單獨的一個記錄,但不做解析處理。這對那些沒有被格式化的資料或是基於行的記錄來說是很有用的,比如日誌檔案。更有趣的一個輸入格式是KeyValueInputFormat,這個格式也是把輸入檔案每一行作為單獨的一個記錄。然而不同的是TextInputFormat把整個檔案行當做值資料,KeyValueInputFormat則是通過搜尋tab字元來把行拆分為鍵值對。這在把一個MapReduce的作業輸出作為下一個作業的輸入時顯得特別有用,因為預設輸出格式(下面有更詳細的描述)正是按KeyValueInputFormat格式輸出資料。最後來講講SequenceFileInputFormat,它會讀取特殊的特定於Hadoop的二進位制檔案,這些檔案包含了很多能讓Hadoop的mapper快速讀取資料的特性。Sequence檔案是塊壓縮的並提供了對幾種資料型別(不僅僅是文字型別)直接的序列化與反序列化操作。Squence檔案可以作為MapReduce任務的輸出資料,並且用它做一個MapReduce作業到另一個作業的中間資料是很高效的。

輸入塊(InputSplit):一個輸入塊描述了構成MapReduce程式中單個map任務的一個單元。把一個MapReduce程式應用到一個數據集上,即是指一個作業,會由幾個(也可能幾百個)任務組成。Map任務可能會讀取整個檔案,但一般是讀取檔案的一部分。預設情況下,FileInputFormat及其子類會以64MB(與HDFS的Block預設大小相同,譯註:Hadoop建議Split大小與此相同)為基數來拆分檔案。你可以在hadoop-site.xml(譯註:0.20.*以後是在mapred-default.xml裡)檔案內設定mapred.min.split.size引數來控制具體劃分大小,或者在具體MapReduce作業的JobConf物件中重寫這個引數。通過以塊形式處理檔案,我們可以讓多個map任務並行的操作一個檔案。如果檔案非常大的話,這個特性可以通過並行處理大幅的提升效能。更重要的是,因為多個塊(Block)組成的檔案可能會分散在叢集內的好幾個節點上(譯註:事實上就是這樣),這樣就可以把任務排程在不同的節點上;因此所有的單個塊都是本地處理的,而不是把資料從一個節點傳輸到另外一個節點。當然,日誌檔案可以以明智的塊處理方式進行處理,但是有些檔案格式不支援塊處理方式。針對這種情況,你可以寫一個自定義的InputFormat,這樣你就可以控制你檔案是如何被拆分(或不拆分)成檔案塊的。

輸入格式定義了組成mapping階段的map任務列表,每一個任務對應一個輸入塊。接著根據輸入檔案塊所在的實體地址,這些任務會被分派到對應的系統節點上,可能會有多個map任務被分派到同一個節點上。任務分派好後,節點開始執行任務,嘗試去最大並行化執行。節點上的最大任務並行數由mapred.tasktracker.map.tasks.maximum引數控制。

記錄讀取器(RecordReader):InputSplit定義瞭如何切分工作,但是沒有描述如何去訪問它。 RecordReader類則是實際的用來載入資料並把資料轉換為適合mapper讀取的鍵值對。RecordReader例項是由輸入格式定義的,預設的輸入格式,TextInputFormat,提供了一個LineRecordReader,這個類的會把輸入檔案的每一行作為一個新的值,關聯到每一行的鍵則是該行在檔案中的位元組偏移量。RecordReader會在輸入塊上被重複的呼叫直到整個輸入塊被處理完畢,每一次呼叫RecordReader都會呼叫Mapper的map()方法。

2.當面對一些特殊的<key,value>鍵值對時,如key是由一個檔名和記錄位置組成的鍵值時,這時hadoop本身提供的TextInputFormat、CombineInputFormat、NLineInputFormat等肯定是無法滿足我們的需求的,所以這裡需要重寫自己的輸入分隔。MapReduce定義了介面InputFormat,它提供了兩個方法,getSplits()和createRecordRead(),其中getSplits()負責對輸入檔案進行切割,切割之後便是一個個split,比如hadoop預設提供的幾個InputFormat都是對大於BlockSize的檔案進行切割,小於它的不切割,我們這裡可以直接按照這種特性。而createRecordRead()則負責將一個split按照一定格式切割成一個個<K,V>對,以便MapReduce在map時呼叫。所以,我們的關鍵就是去定義這個<K,V>的切割。就要求開發人員繼承FileInputFormat,用於實現一種新的輸入格式,同時還需要繼承RecordReader,用於實現基於新輸入格式Key和Value值的讀取方法。

FileInputFormat實現了InputFormat這個介面,實現了只對大於BlockSize的檔案進行切割,並且保留了createRecordRead()這個方法讓我們自己去實現。所以我們可以寫一個類FileKeyInputFormat來extends這個FileInputFormat類,然後Override這個createRecordRead()方法。

參考TextInputFormat發現,它也是繼承FileInputFormat,然後重寫了createRecordRead(),在createRecordRead()裡面call了LineRecordReader,由它來實現輸入分隔。好吧,重點就來到了,那就是自己寫一個類似於LineRecordReader的FileKeyRecordReader類,然後給我們的FileKeyInputForma來呼叫。LineRecordReader 繼承 RecordReader時,重寫了它的六個方法,分別是initialize()、getCurrentKey()、getCurrentValue()、getProgress()、Close()、nextKeyValue(),這裡也一樣需要重寫這幾個方法。

實驗步驟

1.建表,逗號分隔

2.本地新建/data/mapreduce11目錄。

mkdir-p/data/mapreduce11

3.將表上傳到虛擬機器中

4.上傳並解壓hadoop2lib檔案

5.在HDFS上新建/mymapreduce11/in目錄,然後將Linux本地/data/mapreduce11目錄下的cat1檔案匯入到HDFS的/mymapreduce11/in目錄中。

hadoopfs-mkdir-p/mymapreduce11/in

hadoopfs-put/data/mapreduce11/cat1/mymapreduce11/in

6.IDEA中編寫Java程式碼

package mapreduce10;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class FileKeyMR{
    public static class Map extends Mapper<Object,Text,Text,Text>{
        public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
            String line=value.toString();
            System.out.println(line);
            String str[]=line.split(",");
            for(String st:str){
                context.write(key,new Text(st));
            }
            System.out.println(line);
        }
    }
    public static class Reduce extends Reducer<Text,Text,Text,Text>{
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            String s=":";
            for(Text val:values){
                s+=val;
            }
            context.write(key,new Text(s));
        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
        Configuration conf=new Configuration();
        Job job=new Job(conf,"FileKeyMR");
        job.setJarByClass(FileKeyMR.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(FileKeyInputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://192.168.149.10:9000/mymapreduce11/in/cat1"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.149.10:9000/mymapreduce11/out"));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
package mapreduce10;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class FileKeyInputFormat extends FileInputFormat<Text,Text>{
    public FileKeyInputFormat(){}
    public RecordReader<Text,Text> createRecordReader(InputSplit split,TaskAttemptContext tac)
            throws IOException,InterruptedException{
        FileKeyRecordReader fkrr=new FileKeyRecordReader();
        try {
            fkrr.initialize(split,tac);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return fkrr;
    }
    protected long computeSplitSize(long blockSize,long minSize,long maxSize){
        return super.computeSplitSize(blockSize,minSize,maxSize);
    }
    public List<InputSplit> getSplits(JobContext arg0)throws IOException{
        return super.getSplits(arg0);
    }
    protected boolean isSplitable(JobContext context,Path filename){
        return true;
    }
    protected List<FileStatus> listStatus(JobContext arg0)throws IOException{
        return super.listStatus(arg0);
    }
}
package mapreduce10;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class FileKeyRecordReader extends RecordReader<Text,Text> {
    public FileKeyRecordReader(){
    }
    String fn;
    LineRecordReader lrr=new LineRecordReader();
    public void initialize(InputSplit arg0,TaskAttemptContext arg1)
            throws IOException,InterruptedException{
        lrr.initialize(arg0, arg1);
        this.fn=((FileSplit)arg0).getPath().getName();
    }
    public void close()throws IOException{
        lrr.close();
    }
    public Text getCurrentKey()throws IOException,InterruptedException{
        System.out.println("CurrentKey");
        LongWritable lw=lrr.getCurrentKey();
        Text key =new Text("("+fn+"@"+lw+")");
        System.out.println("key--"+key);
        return key;
    }
    public Text getCurrentValue()throws IOException,InterruptedException{
        return lrr.getCurrentValue();
    }
    public float getProgress()throws IOException,InterruptedException{
        return 0;
    }
    public boolean nextKeyValue() throws IOException,InterruptedIOException{
        return lrr.nextKeyValue();
    }
}

7.將hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。

8.拷貝log4j.properties檔案

9.執行結果

hadoopfs-ls/mymapreduce11/out

hadoopfs-cat/mymapreduce11/out/part-r-00000