MapReduce寫程式碼的流程,以及需要繼承的超類
package tq; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ScanPerformanceEvaluation.MyMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.TestMiniMRClientCluster.MyReducer; import org.apache.hadoop.mapreduce.Job; import wordcount.MyCombiner; public class TianQi { public static void main(String[] args) throws IOException { //設定配置項 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //設定 job.setJarByClass(TianQi.class); job.setJobName("sdfjsk"); //設定讀取檔案的路徑 Path filein = new Path("jk"); FileInputFormat.addInputPath(job, filein); //設定檔案的輸出的路徑 Path fileout = new Path("fdsd"); if(fileout.getFileSystem(conf).exists(fileout)) { fileout.getFileSystem(conf).delete(fileout,true); } FileOutputFormat.setOutputPath(job, fileout); //設定檔案的讀入的格式 MyInputFormat extends InputFormat.class job.setInputFormatClass(MyInputFormat.class); // Multiple markers at this line // - The method setInputFormatClass(Class<? extends InputFormat>) in the type Job is not applicable for the arguments // (Class<MyInputFormat>) // - MyInputFormat cannot be resolved to a type //設定檔案讀出的格式 MyOutFormat extends OutputFormat.class job.setOutputFormatClass(MyOutFormat.class); // Multiple markers at this line // - The method setOutputFormatClass(Class<? extends OutputFormat>) in the type Job is not applicable for the arguments // (Class<MyOutFormat>) // - MyOutFormat cannot be resolved to a type //設定map端 mymapper.class extends mapper.class job.setMapperClass(MyMapper.class); //設定map端輸出的格式 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable); //設定comparator排序規則 job.setSortComparatorClass(MySortComparator.class); // Multiple markers at this line // - The method setSortComparatorClass(Class<? extends RawComparator>) in the type Job is not applicable for the // arguments (Class<MySortComparator>) // - MySortComparator cannot be resolved to a type //設定partition分割槽 Mypartition extends Partitoner job.setPartitionerClass(MyPartition.class); // Multiple markers at this line // - MyPartition cannot be resolved to a type // - The method setPartitionerClass(Class<? extends Partitioner>) in the type Job is not applicable for the arguments // (Class<MyPartition>) //設定map端的預聚合 MyCombiner.class extends Reducer.class job.setCombinerClass(MyCombiner.class); // The method setCombinerClass(Class<? extends Reducer>) in the type Job is not applicable for the arguments (Class<MyCombiner>) //設定 job.setGroupingComparatorClass(MyGroup.class); // Multiple markers at this line // - The method setGroupingComparatorClass(Class<? extends RawComparator>) in the type Job is not applicable for the // arguments (Class<MyGroup>) // - MyGroup cannot be resolved to a type //設定reduce端 job.setReducerClass(MyReducer.class); //設定reduce端的輸出的key job.setOutputKeyClass(Text.class); //設定reduce端的輸出的value job.setOutputValueClass(IntWritable); //設定map端的task的個數 job.setNumReduceTasks(2); /** * 總結: * 一、設定conf configuration conf = new configuration * Job job = Job.getInstance(conf) * 二、設定檔名和jobName * job.getJarbyclass() * job.setJobName() * 三、設定檔案的輸入路徑和輸出路徑 * FileInputFormat.addInputPATH * FileOutputFormat.setoutputpath * if(filleout.getfilesystem(conf).exists(fileout)) * fileout.getfileoutsystem(conf).delete(fileout) * 四、設定檔案讀入的型別 * job.setFileInputFormat extends inputFormat.class * 五、設定檔案的讀出的型別 * job.setFileOutFormatclass extends outputFormat.class * * 六、設定檔案map端 * job.setMapperclass extends mapper * 七、設定map端的輸出的key * job.setmapoutputkeyclass * 八、設定map端的輸出的value * job.setmapoutputvalueclass * 九、設定排序sort * job.setsortComparator() extends RawComparator() * 十、設定排序 * job.setGroupingComparatorclass extends RawComparator.class * 十一、設定partition * job.setpartitionclass extends partitioner * 十二、設定reduce * job.setReducerclass extends reducer.class * 十三、設定reduce端的輸出的key * job.setoutputkeyclass * 十四、設定reduce端的輸出的value * job.setoutputvalueclass * 十五、設定reduce端task的個數 * job.setNumoofReduceTask() * 十六、最終設定job.waitforcomplettion(true) * */ } }
總結:
/** * 總結: * 一、設定conf configuration conf = new configuration * Job job = Job.getInstance(conf) * 二、設定檔名和jobName * job.getJarbyclass() * job.setJobName() * 三、設定檔案的輸入路徑和輸出路徑 * FileInputFormat.addInputPATH * FileOutputFormat.setoutputpath * if(filleout.getfilesystem(conf).exists(fileout)) * fileout.getfileoutsystem(conf).delete(fileout) * 四、設定檔案讀入的型別 * job.setFileInputFormat extends inputFormat.class * 五、設定檔案的讀出的型別 * job.setFileOutFormatclass extends outputFormat.class * * 六、設定檔案map端 * job.setMapperclass extends mapper * 七、設定map端的輸出的key * job.setmapoutputkeyclass * 八、設定map端的輸出的value * job.setmapoutputvalueclass * 九、設定排序sort * job.setsortComparator() extends RawComparator() * 十、設定排序 * job.setGroupingComparatorclass extends RawComparator.class * 十一、設定partition * job.setpartitionclass extends partitioner * 十二、設定reduce * job.setReducerclass extends reducer.class * 十三、設定reduce端的輸出的key * job.setoutputkeyclass * 十四、設定reduce端的輸出的value * job.setoutputvalueclass * 十五、設定reduce端task的個數 * job.setNumoofReduceTask() * 十六、最終設定job.waitforcomplettion(true) * */
MapReduce寫程式碼的流程:
分為以下幾個類“
一、公共設定(四種):
1、設定conf
configuration conf = new configuration()
Job job = Job.getInstance(conf);
2、設定類名
job.setJarByclass(tq.class)
job.setJobName("sdfds")
3、設定檔案的讀入路徑和讀出路徑
Path filein = new Path("sdfs")
FileinputFormat.addInputparh(job, filein)
Path fileout = new Path("dfdjs")
if(fileout.getFilesystem(conf).exists(fileout){
fileout.getfilesystem(conf).delete(fileout)
}
FileOutpuFormat.setOutPath(job,fileout);
4、設定檔案的讀入格式和讀出格式
job.setfileinputformatclass (fddsf) extends inputformat()
job.setfileoutputformat(dfd) extends inputformat.class
二、設定map端
1、設定map端
job.setmapperclass extends mapper
2、設定map端的輸出key和value的值
job.setmapoutputkeyclass
job.setmapoutputvalueclass
三、設定map端輸出之後
1、設定排序
job.setsortComparatorclass extends RawComparator.class
job.setGroupingComparatorclass extends RawComparator.class
2、設定分割槽
job.setpartitionerclass extends partitioner.class
3、設定map端的預聚合
job.setcombinerclass extends reducer
四、設定reduce端
1、job.setreducerclass extends Reducer.class
2、設定reduce端的輸出
job.setoutputkeyclass()
job.setoutputvalueclass()
五、所有的都結束之後
1、設定reduce端task的個數
job.setNumofReducetask(2)
2、job.waitforcompletition(true)