MapReduce之自定義分割槽器Partitioner
阿新 • • 發佈:2020-07-21
@[toc]
## 問題引出
>要求將統計結果按照條件輸出到不同檔案中(分割槽)。
比如:將統計結果按照**手機歸屬地不同省份**輸出到不同檔案中(分割槽)
## 預設Partitioner分割槽
```java
public class HashPartitioner extends Partitioner{
public int getPartition(K key,V value, int numReduceTasks){
return (key.hashCode() & Integer.MAX VALUE) & numReduceTasks;
}
}
```
- 預設分割槽是根據**key**的**hashCode**對**ReduceTasks**個數取模得到的。
- 使用者沒法控制哪個key儲存到哪個分割槽。
## 自定義Partitioner步驟
1. 自定義類繼承`Partitioner`,重寫`getPartition()`方法
```java
public class CustomPartitioner extends Partitioner{
@Override
public int getPartition(Text key,FlowBean value,int numPartitions){
//控制分割槽程式碼邏輯
……
return partition;
}
}
```
2. 在Job驅動類中,設定自定義`Partitioner`
```java
job.setPartitionerClass(CustomPartitioner.class)
```
3. 自定義Partition後,要根據自定義Partitioner的邏輯設定相應數量的`ReduceTask`
```java
job.setNumReduceTask(5);//假設需要分5個區
```
## Partition分割槽案例實操
將統計結果按照手機==歸屬地==不同省份輸出到不同檔案中(分割槽)
**輸入資料:**
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721172302429.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFFF,t_70)
**期望輸出資料:**
手機號==136、137、138、139==開頭都分別放到一個獨立的4個檔案中,==其他開頭==的放到一個檔案中。所以總共分為5個檔案,也就是五個區。
>相比於之前的自定義flowbean,這次自定義分割槽,只需要多編寫一個分割槽器,以及在job驅動類中設定分割槽器,mapper和reducer類不改變
**MyPartitioner.java**
```java
/*
* KEY, VALUE: Mapper輸出的Key-value型別
*/
public class MyPartitioner extends Partitioner{
// 計算分割槽 numPartitions為總的分割槽數,reduceTask的數量
// 分割槽號必須為int型的值,且必須符合 0<= partitionNum < numPartitions
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
String suffix = key.toString().substring(0, 3);//前開後閉,取手機號前三位數
int partitionNum=0;//分割槽編號
switch (suffix) {
case "136":
partitionNum=numPartitions-1;//由於分割槽編號不能大於分割槽總數,所以用這種方法比較好
break;
case "137":
partitionNum=numPartitions-2;
break;
case "138":
partitionNum=numPartitions-3;
break;
case "139":
partitionNum=numPartitions-4;
break;
default:
break;
}
return partitionNum;
}
}
```
**FlowBeanDriver.java**
```java
public class FlowBeanDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("e:/mrinput/flowbean");
Path outputPath=new Path("e:/mroutput/partitionflowbean");
//作為整個Job的配置
Configuration conf = new Configuration();
//保證輸出目錄不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①建立Job
Job job = Job.getInstance(conf);
// ②設定Job
// 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
job.setMapperClass(FlowBeanMapper.class);
job.setReducerClass(FlowBeanReducer.class);
// Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
// 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 設定輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 設定ReduceTask的數量為5
job.setNumReduceTasks(5);
// 設定使用自定義的分割槽器
job.setPartitionerClass(MyPartitioner.class);
// ③執行Job
job.waitForCompletion(true);
}
}
```
**FlowBeanMapper.java**
```java
/*
* 1. 統計手機號(String)的上行(long,int),下行(long,int),總流量(long,int)
*
* 手機號為key,Bean{上行(long,int),下行(long,int),總流量(long,int)}為value
*
*
*
*
*/
public class FlowBeanMapper extends Mapper{
private Text out_key=new Text();
private FlowBean out_value=new FlowBean();
// (0,1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200)
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
//封裝手機號
out_key.set(words[1]);
// 封裝上行
out_value.setUpFlow(Long.parseLong(words[words.length-3]));
// 封裝下行
out_value.setDownFlow(Long.parseLong(words[words.length-2]));
context.write(out_key, out_value);
}
}
```
**FlowBeanReducer.java**
```java
public class FlowBeanReducer extends Reducer{
private FlowBean out_value=new FlowBean();
@Override
protected void reduce(Text key, Iterable values, Reducer.Context context)
throws IOException, InterruptedException {
long sumUpFlow=0;
long sumDownFlow=0;
for (FlowBean flowBean : values) {
sumUpFlow+=flowBean.getUpFlow();
sumDownFlow+=flowBean.getDownFlow();
}
out_value.setUpFlow(sumUpFlow);
out_value.setDownFlow(sumDownFlow);
out_value.setSumFlow(sumDownFlow+sumUpFlow);
context.write(key, out_value);
}
}
```
**FlowBean.java**
```java
public class FlowBean implements Writable{
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
// 序列化 在寫出屬性時,如果為引用資料型別,屬性不能為null
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//反序列化 序列化和反序列化的順序要一致
@Override
public void readFields(DataInput in) throws IOException {
upFlow=in.readLong();
downFlow=in.readLong();
sumFlow=in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
```
**輸出結果:**
總共五個檔案
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173120509.png)
一號區:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173159784.png)
二號區:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173216243.png)
三號區:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173300267.png)
四號區:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173233989.png)
其他號碼為第五號區:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/2020072117332855.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFFF,t_70)
## 分割槽總結
- 如果`ReduceTask的數量 > getPartition的結果數`,則會多產生幾個空的輸出檔案part-r-000xx
- 如果`Reduceask的數量 < getPartition的結果數`,則有一部分分割槽資料無處安放,會Exception
- 如果`ReduceTask的數量 = 1`,則不管MapTask端輸出多少個分割槽檔案,最終結果都交給這一個ReduceTask,最終也就只會產生一個結果檔案partr-00000
以剛才的案例分析:
例如:**假設自定義分割槽數為5**,則
- job.setlNlurmReduce Task(1);會正常執行,只不過會產生一個輸出檔案
- job.setlNlunReduce Task(2),會報錯
- job.setNumReduceTasks(6);大於5,程式會正常執行,會產生