word count的reduce過程以及專案打包部署
map過程已經寫完了,上面那個流程我們涉及到了泛型以及序列化,我們要知道每個引數代表的含義,這樣有助於我們理解整個流程。
下面我們開始reduce,這個過程我們要把map輸出的鍵值對把key值相同的放在一起,具體的流程我們看程式碼:
package MR.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN, VALUEIN這兩個引數是map段輸出的值,就是之前的key,value鍵值對(Text, IntWritable)
* KEYOUT, VALUEOUT這兩個引數是我們要輸出的資料格式,比如(“hello",5),("Hadoop",1)等等等等
* Reduce類中有一個迭代器,會迴圈獲取資料
* */
//繼承Reducer類
public class wcReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
/**
* 重寫reduce方法
* Text key:就是讀取進來的每一個單詞(比如:hello)
* Iterable<IntWritable> values hello這個key裡面所有的value值(1,1,1,1,1)
* */
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
//對values做迭代累加
for (IntWritable value : values) {
//把IntWritable轉成int值累加
sum+=value.get();
}
//通過上下文輸出
context.write(key,new IntWritable(sum));
}
}
然後我們再寫驅動類,這個類基本是一些固定的寫法:
package MR.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class wcDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//設定檔案輸入路徑,也是就我們要讀取的那個文字檔案路徑(第一個引數)
//第二個引數是我們的輸出路徑
String[] path=new String[]{"/word.txt","/output007"};
//設定配置檔案
//獲取配置檔案物件
Configuration conf = new Configuration();
//這裡我們可以使用Windows環境裡面的hadoop去執行,但是現在我們先把它放到叢集上,所以要先配置叢集資源
/**
* 配置conf物件,要根據/opt/module/hadoop-2.8.4/etc/hadoop/core-site.xml 這個檔案裡面的去配置
* <property>
* <name>fs.defaultFS</name>
* <value>hdfs://bigdata101:9000</value>
* </property>
* */
conf.set("fs.defaultFS","hdfs://bigdata101:9000");//設定叢集資源地址
//1,載入任務
Job job = Job.getInstance(conf);
//2,設定任務的驅動類(jar載入路徑),通過反射獲取
job.setJarByClass(wcDriver.class);
//3,設定map程式資訊
job.setMapperClass(wcMapper.class);
job.setMapOutputKeyClass(Text.class);//設定輸出的key型別,map階段的輸出key型別
job.setMapOutputValueClass(IntWritable.class);//設定輸出的value型別,map階段輸出的value型別
//4,設定reduce程式資訊
job.setReducerClass(wcReduce.class);
job.setOutputKeyClass(Text.class);//reduce階段輸出的key型別
job.setMapOutputValueClass(IntWritable.class);// reduce階段的value型別
//5,設定輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job,new Path(path[0]));
FileOutputFormat.setOutputPath(job,new Path(path[1]));
// 6,提交任務
boolean res=job.waitForCompletion(true);
System.exit(res?0:1);
System.out.println(res?"執行成功":"執行失敗");
}
}
現在整個流程就寫完了。寫完以後我們先在叢集上跑一下試試效果。先打一個jar包:
IDEA 右邊:
執行完以後我們可以在左邊看見打好的jar包:
把這個jar包拖到桌面上,改一下名字:wordDemo.jar
然後開啟三臺虛擬機器,在namenode上啟動hdfs:start-dfs.sh
在102上啟動yarn:start-yarn.sh
啟動完畢以後我們先手動建一個檔案:vim word.txt
寫入資料:
hello world
hello scala
hello spark
hello hadoop
hello mr
儲存推出。然後把這個檔案放到hdfs根目錄上:hdfs dfs -put word.txt /
現在我們把jar包上傳到Linux上:
上傳完畢以後是這樣:
這個檔案我們沒有執行許可權,給他授權:chmod 755 wordDemo.jar
然後就綠了:
綠了以後就可以在使用Hadoop命令執行這個檔案了,執行的時候我們要用到這個檔案驅動的全類名,我們提前把路徑複製下來:
然後執行:hadoop jar wordDemo.jar MR.wc.wcDriver
我們開啟瀏覽器,輸入yarn的地址:http://192.168.53.102:8088/,可以看見作業執行的資訊:
執行完以後,我們再開啟一個網頁視窗,輸入hdfs地址:http://192.168.53.101:50070/,可以看見我們指定的那個檔案已經生成了(我們每次執行的時候都會新生成一個檔案):
我們給他開啟:
點選part開頭的檔案:
可以把這個檔案下載下來,用notepad開啟:
ok,搞定。其實這裡就是兩個步驟,一個map,一個reduce,當然了,細心的童鞋有可能發現了這個結果還被排序了,我們在程式碼裡面沒有看見,不著急,後面會慢慢展開來說。現在對mapreduce過程應該有一個大致的瞭解了。現在再回去看看那個mapreduce的流程圖,會稍微清晰一些。