MapReduce全流程_分割槽_排序
阿新 • • 發佈:2020-08-19
1、MapReduce完成的工作流程:
2、分割槽操作(Partition分割槽)
Partition分割槽案例實操
1.需求
將統計結果按照手機歸屬地不同省份輸出到不同檔案中(分割槽)期望輸出資料手機號136、137、138、139開頭都分別放到一個獨立的4個檔案中,其他開頭的放到一個檔案中。
程式碼如下:
package partiton; import flow.FlowBean; import flow.FlowMapper; import flow.FlowReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; public class partitonDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1、獲取job例項 Job job=Job.getInstance(new Configuration()); //2、設定類路徑 job.setJarByClass(partitonDriver.class); //3、設定Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setNumReduceTasks(5); job.setPartitionerClass(MyPartitioner.class); //4、設定輸入輸入輸出型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //5、設定輸入輸出路徑 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //6、進行提交 boolean b=job.waitForCompletion(true); System.exit(b ? 0:1); } }
package partiton; import flow.FlowBean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartitioner extends Partitioner<Text, FlowBean> { //返回分割槽號 public int getPartition(Text text, FlowBean flowBean, int i) { String phone=text.toString(); switch (phone.substring(0,3)){ case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 4; } } }
成功執行之後
並存儲為了檔案。顯然已經了分割槽操作
3、排序操作:
排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均會對資料按照key進行排序。該操作屬於Hadoop的預設行為。任何應用程式中的資料均會被排序,而不管邏輯上是否需要。預設排序是按照字典順序排序,且實現該排序的方法是快速排序。
(1)部分排序
MapReduce根據輸入記錄的鍵對資料集排序。保證輸出的每個檔案內部有序。
(2)全排序
最終輸出結果只有一個檔案,且檔案內部有序。實現方式是隻設定一個Reduce Task。但該方法在
處理大型檔案時效率極低,因為一臺機器處理所有檔案,完全喪失了MapReduce所提供的並行架構。
在Redre端對key進行分組。應用於:在接收的ke y為bean物件時,想讓-個或幾個欄位相同(全部
欄位比較不相同)的hkey進入 到同-個reduce方法時,可以採用分組排序。
(4)二次排序.
在自定義排序過程中,如果compare To中的判斷條件為兩個即為二次排序。
程式碼如下:
package writablecomparable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable ,Comparable<FlowBean>{ private long upFlow; private long downFlow; private long sumFlow; //準備一個空參構造器 public FlowBean() {} public void set(long upFlow,long downFlow) { this.downFlow=downFlow; this.upFlow=upFlow; this.sumFlow=upFlow+downFlow; } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } //序列化方法 //提供資料的出口 public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //反序列化方法 //框架提供的資料來源 public void readFields(DataInput dataInput) throws IOException { upFlow=dataInput.readLong(); downFlow=dataInput.readLong(); sumFlow=dataInput.readLong(); } @Override public int compareTo(FlowBean o) { return Long.compare(o.sumFlow,this.sumFlow); } //這兩個方法裡面的內容順序要一樣uds, }
package writablecomparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReducer extends Reducer<FlowBean, Text,Text,FlowBean> { @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text value:values) { context.write(value,key); } } }
package writablecomparable; import com.sun.tools.javac.comp.Flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job =Job.getInstance(new Configuration()); job.setJarByClass(SortDriver.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job,new Path("D:\\wev")); FileOutputFormat.setOutputPath(job,new Path("D:\\wev")); boolean b=job.waitForCompletion(true); System.exit(b?0:1); } }
package writablecomparable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper<LongWritable,Text,FlowBean, Text> { private FlowBean flow=new FlowBean(); private Text phone =new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fieds=value.toString().split("\t"); phone.set(fieds[0]); flow.setUpFlow(Long.parseLong(fieds[1])); flow.setDownFlow(Long.parseLong(fieds[2])); flow.setSumFlow(Long.parseLong(fieds[3])); context.write(flow,phone); } }
執行結果顯示已經按照流量排序而完成: