在eclipse中實現MapReduce
阿新 • • 發佈:2018-12-16
1.準備環境
- Windows下的Hadoop的mapred-site.xml 和 yarn-site.xml配置檔案更新為和虛擬機器中的一樣。
- 將mapred-site.xml和yarn-site.xml配置檔案拷貝到工程下。
- 新增依賴包。
2.執行模式
- 本地執行(在本地的eclipse中啟動多個執行緒來模擬map task,和reduce task執行)。主要用於測試環境。 需要修改mapred-site.xml配置檔案中的 mapreduce.framework.name,修改為local。
- 提交到叢集中執行。主要用於生產環境。 需要先將工程打成一個jar包,拷貝到虛擬機器中。使用hadoop jar命令執行。
- 在本機上的eclipse中直接提交任務到叢集中執行。 需要先將工程達成一個jar包,放在本地一個地方。比如d盤下。然後在程式中設定job.setJar(“jar包的路徑”)。最後還要修改mapred-site.xml配置檔案為
<property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.app-submission.cross-platform</name> <value>true</value> </property>
3.一個簡單的wordcount例項,用於統計一篇文章中某個單詞出現的次數
- 主函式
public class WC { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //設定配置物件 Configuration conf = new Configuration(true); //設定一個job物件 Job job = Job.getInstance(conf); //設定當前main函式所在類 job.setJarByClass(WC.class); //需要使用到第三種執行模式時要設定本地jar包的位置 job.setJar("d:/wc.jar"); //設定輸入路徑 FileInputFormat.setInputPaths(job, "/input/wc"); //設定輸出路徑 Path outputPath = new Path("/output/"); FileSystem fs = outputPath.getFileSystem(conf); //判斷這個輸出路徑是否存在,存在就把它刪除 if(fs.exists(outputPath)){ fs.delete(outputPath,true); } FileOutputFormat.setOutputPath(job, outputPath); //設定Map class job.setMapperClass(WCMapper.class); //設定map輸出key、value的型別 key是單詞,value是1 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設定reduce class job.setReducerClass(WCReduce.class); //設定reduce task的個數 job.setNumReduceTasks(2); //列印資訊 job.waitForCompletion(true); } }
- Map class
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text myKey = new Text();
IntWritable myValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//根據空格將單詞切割出來
String[] words = StringUtils.split(value.toString(), ' ');
for (String word : words) {
myKey.set(word);
context.write(myKey,myValue);
}
}
}
- Reduce class
public class WCReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}