流量統計之自定義Partitioner(第六步)
阿新 • • 發佈:2021-07-16
1、Partitioner即reduce上進行自定義分割槽個數
先搜尋Partitioner.java。
然後在HashPartitioner.java中:
public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
numReduceTasks是根據key獲取hashCode(),與Integer.MAX_VALUE做運算,得出結果與numReduceTasks取模值
其中numReduceTasks:Job中所指定的reducer的個數,reducer決定了Job輸出的檔案數量
HashPartitioner是MapReduce預設的分割槽規則
假設reducer是3
1 % 3 = 3
2 % 3 = 2
3 % 3 = 0
需求:將統計結果按照手機號的字首進行區分,並輸出到不同的輸出檔案中去。
13* ==> ..
15* ==> ..
other ==> ..
Partitioner決定maptask輸出的資料交由哪個reducetask處理
預設實現:分發的key的hash值與reducetask個數取模
2、AccessPartitioner.java
package mr.access; /* * MapReduce自定義分割槽規則 */ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;//按照HashPartitioner.java中,需要繼承父類 //在這裡的key和value是mapper的輸出 //然後按alt+shift+enter實現方法 public class AccessPartitioner extends Partitioner<Text, Access> { //以下的方法進來的key是phone手機號 //access:是統計結果 //numPartitions:reduce的數量 @Override public int getPartition(Text text, Access access, int numPartitions) {if (text.toString().startsWith("13")) { return 0; } else if (text.toString().startsWith("15")){ return 1; } else { return 2; } } }
3、AccessLocalApp.java
新增分割槽規則和分割槽個數
//5、設定自定義分割槽規則 job.setPartitionerClass(AccessPartitioner.class); //設定reduce個位 job.setNumReduceTasks(3);
4、access.log
新增兩行資料
031 15161021755 192.168.126.104 200 45 50 032 16161021755 192.168.126.104 200 45 50