並行作業3:在eclipse中開發MapReduce程式
阿新 • • 發佈:2018-12-03
在eclipse中開發MapReduce程式
系統採用vm下ubuntu16.04
一、eclipse安裝(參考我的其它部落格)
二、eclipse配置
1、下載hadoop-eclipse-plugin-2.7.3.jar外掛,並將其拖到虛擬機器桌面
2、將其移動到/usr/local/java/ide/eclipse/plugins目錄下
cd ~/桌面
sudo mv hadoop-eclipse-plugin-2.7.3.jar /usr/local/java/ide/eclipse/plugins
3、重啟eclipse
4、切換檢視,點選右上角小象
5、點選右下帶加號小象
6、點選配置檔案進行配置
Localtion name:hadoopTest
Map/Reduce Master:
host:localhost
Port:9001
//這裡選中Use M/R Master host按鈕
DFS Master:
host:localhost
Port:9000
username:hk
7、左側專案欄
DFS Locations
|--hadoopTest
|--資料夾(0)
8、命令列輸入,離開安全模式,以便在eclipse內可以直接懟hdfs目錄下檔案進行操作。
hadoop dfsadmin -safemode leave
9、新建上傳程式碼相關檔案
DFS Locations
|--hadoopTest
|--(1)
|--user(1)
|--hk(1)
|--sort_in(3)
|--file1.txt
|--file2.txt
|--file3.txt
三、MapReduce程式
1、例項描述
對輸入檔案中資料進行排序。輸入檔案中的每行內容均為一個數字,即一個數據。要求在輸出中每行有兩個間隔的數字,其中,第一個代表原始資料在原始資料集中的位次,第二個代表原始資料。
(1)file1.txt:
2
32
654
32
15
756
65223
(2)file2.txt:
5956
22
650
92
(3)file3.txt:
26
54
6
期望輸出
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
2、設計思路
這個例項僅僅要求對輸入資料進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個預設的排序,而不需要自己再實現具體的排序呢?答案是肯定的。
但是在使用之前首先需要了解它的預設排序規則。它是按照key值進行排序的,如果key為封裝int的IntWritable型別,那麼MapReduce按照數字大小對key排序,如果key為封裝為String的Text型別,那麼MapReduce按照字典順序對字串排序。
瞭解了這個細節,我們就知道應該使用封裝int的IntWritable型資料結構了。也就是在map中將讀入的資料轉化成 IntWritable型,然後作為key值輸出(value任意)。reduce拿到<key,value-list>之後,將輸入的 key作為value輸出,並根據value-list中元素的個數決定輸出的次數。輸出的key(即程式碼中的linenum)是一個全域性變數,它統計當前key的位次。需要注意的是這個程式中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因為使用map和reduce就已經能夠完成任務了。
3、程式程式碼
package edu.hk.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
//map將輸入中的value化成IntWritable型別,作為輸出的key
public static class Map extends
Mapper<Object,Text,IntWritable,IntWritable>{
private static IntWritable data=new IntWritable();
//實現map函式
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
String line=value.toString();
data.set(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
//reduce將輸入中的key複製到輸出資料的key上,
//然後根據輸入的value-list中元素的個數決定key的輸出次數
//用全域性linenum來代表key的位次
public static class Reduce extends
Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
private static IntWritable linenum = new IntWritable(1);
//實現reduce函式
public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)
throws IOException,InterruptedException{
for(IntWritable val:values){
context.write(linenum, key);
linenum = new IntWritable(linenum.get()+1);
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關鍵
conf.set("fs.default.name", "hdfs://localhost:9000");
String[] ioArgs=new String[]{"sort_in","sort_out"};
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Data Sort <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Data Sort");
job.setJarByClass(Sort.class);
//設定Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設定輸出型別
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//設定輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4、結果
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223