Mapreduce之排序&規約&實戰案例
MapReduce 排序和序列化
簡單介紹
①序列化 (Serialization) 是指把結構化物件轉化為位元組流
②反序列化 (Deserialization) 是序列化的逆過程. 把位元組流轉為結構化物件. 當要在程序間傳
遞物件或持久化物件的時候, 就需要序列化物件成位元組流, 反之當要將接收到或從磁碟讀取
的位元組流轉換為物件, 就要進行反序列化
③Java 的序列化 (Serializable) 是一個重量級序列化框架, 一個物件被序列化後, 會附帶很多額
外的資訊 (各種校驗資訊, header, 繼承體系等), 不便於在網路中高效傳輸. 所以, Hadoop
自己開發了一套序列化機制(Writable), 精簡高效. 不用像 Java 物件類一樣傳輸多層的父子
關係, 需要哪個屬性就傳輸哪個屬性值, 大大的減少網路傳輸的開銷
④Writable 是 Hadoop 的序列化格式, Hadoop 定義了這樣一個 Writable 介面. 一個類要支援可
序列化只需實現這個介面即可
⑤另外 Writable 有一個子介面是 WritableComparable, WritableComparable 是既可實現序列
化, 也可以對key進行比較, 我們這裡可以通過自定義 Key 實現 WritableComparable 來實現
我們的排序功能
實戰案例
資料格式如下
要求:
第一列按照字典順序進行排列
第一列相同的時候, 第二列按照升序進行排列
解決思路:
將 Map 端輸出的 <key,value> 中的 key 和 value 組合成一個新的 key (newKey), value值
不變
這裡就變成 <(key,value),value> , 在針對 newKey 排序的時候, 如果 key 相同, 就再對
value進行排序
Step 1. 自定義型別和比較器
package cn.itcast.mapreduce.sort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SortBean implements WritableComparable<SortBean> {
private String word;
private int num;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return word + '\t' + num ;
}
//實現比較器,指定排序規則
/*
規則:
第一列:按照字典順序進行排列
第二列:當第一列相同,num按照升序進行排列
*/
@Override
public int compareTo(SortBean o) {
//先對第一列排序
int result = this.word.compareTo(o.word);
//如果第一列相同,則按照第二列排序
if(result==0)
{
return this.num-o.num;
}
return result;
}
//實現序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(word);
dataOutput.writeInt(num);
}
//實現反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.word=dataInput.readUTF();
this.num=dataInput.readInt();
}
}
Step 2. Mapper
package cn.itcast.mapreduce.sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text,SortBean, NullWritable> {
/*
將我們的K1和V1轉為K2和V2
K1 v1
0 a 3
5 b 7
-----------------
K2 V2
SortBean(a 3) Nullwritable
SortBean(b 7) NullWritable
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.將V1行文字資料拆分,並將資料封裝到SortBean物件,就可以得到K2
String[] split = value.toString().split("\t");
SortBean sortBean = new SortBean();
sortBean.setWord(split[0]);
sortBean.setNum(Integer.parseInt(split[1]));
//2.將K2和V2寫入上下文中
context.write(sortBean,NullWritable.get());
}
}
Step 3. Reducer
package cn.itcast.mapreduce.sort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<SortBean, NullWritable,SortBean,NullWritable> {
//reduce方法將新的K2和V2轉為K3和V3
@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
Step 4. Main 入口
package cn.itcast.mapreduce.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//1.建立job物件
Job job = Job.getInstance(super.getConf(), "mapreduce_sort");
//job.setJarByClass(JobMain.class);
//2.配置job任務(八個步驟)
//第一步:設定輸入類和路徑
job.setInputFormatClass(TextInputFormat.class);
//TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/input/sort_input"));
TextInputFormat.addInputPath(job,new Path("file:///E:\\input\\sort_input"));
//第二步:設定Mapper型別
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(SortBean.class);
job.setMapOutputValueClass(NullWritable.class);
//第三,四,五,六,排序不需要設定,只要制定好排序規則即可
//第七步:設定Reducer類和型別
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(SortBean.class);
job.setOutputValueClass(NullWritable.class);
//第八步:設定輸出類和輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
//TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop101:8020/out/sort_out"));
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\sort_out"));
//3.等待任務結束
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
輸出結果:
規約Combiner
概念
每一個 map 都可能會產生大量的本地輸出,Combiner 的作用就是對 map 端的輸出先做一次
合併,以減少在 map 和 reduce 節點之間的資料傳輸量,以提高網路IO 效能,是 MapReduce
的一種優化手段之一
- combiner 是 MR 程式中 Mapper 和 Reducer 之外的一種元件
- combiner 元件的父類就是 Reducer
- combiner 和 reducer 的區別在於執行的位置
- Combiner 是在每一個 maptask 所在的節點執行
- Reducer 是接收全域性所有 Mapper 的輸出結果
- combiner 的意義就是對每一個 maptask 的輸出進行區域性彙總,以減小網路傳輸量
實現步驟
1. 自定義一個 combiner 繼承 Reducer,重寫 reduce 方法
2. 在 job 中設定 job.setCombinerClass(CustomCombiner.class)
combiner 能夠應用的前提是不能影響最終的業務邏輯,而且,combiner 的輸出 kv 應該跟
reducer 的輸入 kv 型別要對應起來
統計單詞的出現次數:
MyCombiner類:
package cn.itcast.mapreduce.combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyCombiner extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1.便利結合,將集合中的數字相加,得到V3
long count=0;
for (LongWritable value : values) {
count+=value.get();
}
//2.將K3和V3寫入上下文中
context.write(key,new LongWritable(count));
}
}
無Combiner:
有Combiner:
實戰案例
案例一
統計每個手機號的上行資料包總和,下行資料包總和,上行總流量之和,下行總流量之和 分
析:以手機號碼作為key值,上行流量,下行流量,上行總流量,下行總流量四個欄位作為
value值,然後以這個key,和value作為map階段的輸出,reduce階段的輸入
Step 1: 自定義map的輸出value物件FlowBean
package cn.itcast.mapreduce.floow_count_demo1;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FloowBean implements Writable {
private Integer upFloow;//上行資料包數
private Integer downFlow;
private Integer upCountFlow;//上行流量總和
private Integer downCountFlow;
public FloowBean(Integer upFloow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) {
this.upFloow = upFloow;
this.downFlow = downFlow;
this.upCountFlow = upCountFlow;
this.downCountFlow = downCountFlow;
}
public FloowBean() {
}
public Integer getUpFloow() {
return upFloow;
}
public void setUpFloow(Integer upFloow) {
this.upFloow = upFloow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFloow +
"\t" + downFlow +
"\t" + upCountFlow +
"\t" + downCountFlow;
}
//序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFloow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(upCountFlow);
dataOutput.writeInt(downCountFlow);
}
//反序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFloow=dataInput.readInt();
this.downFlow=dataInput.readInt();
this.upCountFlow=dataInput.readInt();
this.downCountFlow=dataInput.readInt();
}
}
Step 2: 定義FlowMapper類
package cn.itcast.mapreduce.floow_count_demo1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FloowCountMapper extends Mapper<LongWritable, Text,Text,FloowBean> {
/*
將K1和V1轉為K2和V2
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.拆分行文字資料,得到手機號-->K2
String[] split = value.toString().split("\t");
String phoneNumber=split[1];
//2.建立FloowBean物件,並從行文資料中拆分流量的四個階段,並將四個流量的欄位值賦值給FlowBean物件
FloowBean floowBean=new FloowBean();
floowBean.setUpFloow(Integer.parseInt(split[6]));
floowBean.setDownFlow(Integer.parseInt(split[7]));
floowBean.setUpCountFlow(Integer.parseInt(split[8]));
floowBean.setDownCountFlow(Integer.parseInt(split[9]));
//3.將K2和V2寫入上下文中
context.write(new Text(phoneNumber),floowBean);
}
}
Step 3: 定義FlowReducer類
package cn.itcast.mapreduce.floow_count_demo1;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FloowCountReducer extends Reducer<Text,FloowBean,Text,FloowBean> {
@Override
protected void reduce(Text key, Iterable<FloowBean> values, Context context) throws IOException, InterruptedException {
//1.遍歷集合,並將集合中對應的四個欄位累加
Integer upFloow=0;//上行資料包數
Integer downFlow=0;
Integer upCountFlow=0;//上行流量總和
Integer downCountFlow=0;
for (FloowBean value : values) {
upFloow+=value.getUpFloow();
downFlow+=value.getDownFlow();
upCountFlow+=value.getUpCountFlow();
downCountFlow+=value.getDownCountFlow();
}
//2.建立FloowBean物件,並給物件賦值
FloowBean floowBean = new FloowBean(upFloow,downFlow,upCountFlow,downCountFlow);
//3.將K3和V3寫入上下文中
context.write(key,floowBean);
}
}
Step 4: 程式main函式入口FlowMain
package cn.itcast.mapreduce.floow_count_demo1;
import cn.itcast.mapreduce.combiner.MyCombiner;
import cn.itcast.mapreduce.combiner.WordCountMapper;
import cn.itcast.mapreduce.combiner.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
//該方法用於指定一個job任務
@Override
public int run(String[] strings) throws Exception {
//1.建立一個job任務物件
Job job = Job.getInstance(super.getConf(),"mapreduce_floowcount");
//2.配置job任務物件(八個步驟)
//打包jar路徑主類
job.setJarByClass(JobMain.class);
//第一步:指定檔案的讀取方式和讀取路徑
job.setInputFormatClass(TextInputFormat.class);
//TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/wordcount"));
//本地測試元資料
TextInputFormat.addInputPath(job,new Path("file:///E:\\input\\flowcount_input"));
//第二步:指定map階段的處理方式和資料型別
job.setMapperClass(FloowCountMapper.class);
//設定Map階段K2的型別
job.setMapOutputKeyClass(Text.class);
//設定Map階段V2的型別
job.setMapOutputValueClass(FloowBean.class);
//第三<分割槽>,四(排序)
//第五步(規約)
// 第六步預設(分組)
//第七步:指定reduce階段的處理方式和資料型別
job.setReducerClass(FloowCountReducer.class);
//設定K3的型別
job.setOutputKeyClass(Text.class);
//設定V3的型別
job.setOutputValueClass(FloowBean.class);
//第八步:設定輸出型別
job.setOutputFormatClass(TextOutputFormat.class);
//本地測試輸出
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\flowcount_out"));
//等待任務結束
boolean bl=job.waitForCompletion(true);
return bl?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//1.啟動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
輸出:
案例二
需求二: 上行流量倒序排序(遞減排序)
分析,以需求一的輸出資料作為排序的輸入資料,自定義FlowBean,以FlowBean為map輸出的
key,以手機號作為Map輸出的value,因為MapReduce程式會對Map階段輸出的key進行排序
Step 1: 定義FlowBean實現WritableComparable實現比較排序
Java 的 compareTo 方法說明:
- compareTo 方法用於將當前物件與方法的引數進行比較。
- 如果指定的數與引數相等返回 0。
- 如果指定的數小於引數返回 -1。
- 如果指定的數大於引數返回 1。
package cn.itcast.mapreduce.flow_sort_demo2;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private Integer upFloow;//上行資料包數
private Integer downFlow;
private Integer upCountFlow;//上行流量總和
private Integer downCountFlow;
public FlowBean(Integer upFloow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) {
this.upFloow = upFloow;
this.downFlow = downFlow;
this.upCountFlow = upCountFlow;
this.downCountFlow = downCountFlow;
}
public FlowBean() {
}
public Integer getUpFloow() {
return upFloow;
}
public void setUpFloow(Integer upFloow) {
this.upFloow = upFloow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFloow +
"\t" + downFlow +
"\t" + upCountFlow +
"\t" + downCountFlow;
}
//序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFloow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(upCountFlow);
dataOutput.writeInt(downCountFlow);
}
//反序列化方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFloow=dataInput.readInt();
this.downFlow=dataInput.readInt();
this.upCountFlow=dataInput.readInt();
this.downCountFlow=dataInput.readInt();
}
//指定排序規則
@Override
public int compareTo(FlowBean o) {
return o.upFloow-this.upFloow;
}
}
Step 2: 定義FlowMapper
package cn.itcast.mapreduce.flow_sort_demo2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowSortMapper extends Mapper<LongWritable, Text,FlowBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String phoneNumber=split[0];
FlowBean flowBean = new FlowBean(Integer.parseInt(split[1]),Integer.parseInt(split[2]),Integer.parseInt(split[3]),Integer.valueOf(split[4]));
context.write(flowBean,new Text(phoneNumber));
}
}
Step 3: 定義FlowReducer
package cn.itcast.mapreduce.flow_sort_demo2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowSortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}
Step 4: 程式main函式入口
package cn.itcast.mapreduce.flow_sort_demo2;
import cn.itcast.mapreduce.floow_count_demo1.FloowBean;
import cn.itcast.mapreduce.floow_count_demo1.FloowCountMapper;
import cn.itcast.mapreduce.floow_count_demo1.FloowCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
//該方法用於指定一個job任務
@Override
public int run(String[] strings) throws Exception {
//1.建立一個job任務物件
Job job = Job.getInstance(super.getConf(),"mapreduce_flowsort");
//2.配置job任務物件(八個步驟)
//打包jar路徑主類
job.setJarByClass(JobMain.class);
//第一步:指定檔案的讀取方式和讀取路徑
job.setInputFormatClass(TextInputFormat.class);
//TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/wordcount"));
//本地測試元資料
TextInputFormat.addInputPath(job,new Path("file:///E:\\out\\flowcount_out"));
//第二步:指定map階段的處理方式和資料型別
job.setMapperClass(FlowSortMapper.class);
//設定Map階段K2的型別
job.setMapOutputKeyClass(FlowBean.class);
//設定Map階段V2的型別
job.setMapOutputValueClass(Text.class);
//第三<分割槽>,四(排序)
//第五步(規約)
// 第六步預設(分組)
//第七步:指定reduce階段的處理方式和資料型別
job.setReducerClass(FlowSortReducer.class);
//設定K3的型別
job.setOutputKeyClass(Text.class);
//設定V3的型別
job.setOutputValueClass(FlowBean.class);
//第八步:設定輸出型別
job.setOutputFormatClass(TextOutputFormat.class);
//本地測試輸出
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\flowsort_out"));
//等待任務結束
boolean bl=job.waitForCompletion(true);
return bl?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//1.啟動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
輸出:
案列三
在需求一的基礎上,繼續完善,將不同的手機號分到不同的資料檔案的當中去,需要自定義
分割槽來實現,這裡我們自定義來模擬分割槽,將以下數字開頭的手機號進行分開
- 135 開頭資料到一個分割槽檔案
- 136 開頭資料到一個分割槽檔案
- 137 開頭資料到一個分割槽檔案
- 其他分割槽
自定義分割槽
package cn.itcast.mapreduce.flow_count_sort_partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowCountPartition extends Partitioner<Text,FloowBean> {
@Override
public int getPartition(Text text, FloowBean floowBean, int i) {
String s = text.toString();
if(s.startsWith("135"))
return 0;
else if(s.startsWith("136"))
return 1;
else if(s.startsWith("137"))
return 2;
else
return 3;
}
}
程式main函式入口,設定排序的Partition和reducetask個數
package cn.itcast.mapreduce.flow_count_sort_partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
//該方法用於指定一個job任務
@Override
public int run(String[] strings) throws Exception {
//1.建立一個job任務物件
Job job = Job.getInstance(super.getConf(),"mapreduce_flow_partition");
//2.配置job任務物件(八個步驟)
//打包jar路徑主類
job.setJarByClass(JobMain.class);
//第一步:指定檔案的讀取方式和讀取路徑
job.setInputFormatClass(TextInputFormat.class);
//TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/wordcount"));
//本地測試元資料
TextInputFormat.addInputPath(job,new Path("file:///E:\\input\\flowpartition_input"));
//第二步:指定map階段的處理方式和資料型別
job.setMapperClass(FloowCountMapper.class);
//設定Map階段K2的型別
job.setMapOutputKeyClass(Text.class);
//設定Map階段V2的型別
job.setMapOutputValueClass(FloowBean.class);
//第三<分割槽>,四(排序)
job.setPartitionerClass(FlowCountPartition.class);
//第五步(規約)
// 第六步預設(分組)
//第七步:指定reduce階段的處理方式和資料型別
job.setReducerClass(FloowCountReducer.class);
//設定K3的型別
job.setOutputKeyClass(Text.class);
//設定V3的型別
job.setOutputValueClass(FloowBean.class);
//設定reduce個數
job.setNumReduceTasks(4);
//第八步:設定輸出型別
job.setOutputFormatClass(TextOutputFormat.class);
//本地測試輸出
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\flowpartition_out"));
//等待任務結束
boolean bl=job.waitForCompletion(true);
return bl?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//1.啟動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
輸出:
135分割槽
136分割槽
137分割槽
其他分割槽