1. 程式人生 > 其它 >流量統計之自定義Partitioner(第六步)

流量統計之自定義Partitioner(第六步)

1、Partitioner即reduce上進行自定義分割槽個數

先搜尋Partitioner.java。
然後在HashPartitioner.java中:

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

numReduceTasks是根據key獲取hashCode(),與Integer.MAX_VALUE做運算,得出結果與numReduceTasks取模值
其中numReduceTasks:Job中所指定的reducer的個數,reducer決定了Job輸出的檔案數量
HashPartitioner是MapReduce預設的分割槽規則
假設reducer是3
1 % 3 = 3
2 % 3 = 2
3 % 3 = 0

需求:將統計結果按照手機號的字首進行區分,並輸出到不同的輸出檔案中去。
13* ==> ..
15* ==> ..
other ==> ..

Partitioner決定maptask輸出的資料交由哪個reducetask處理
預設實現:分發的key的hash值與reducetask個數取模

2、AccessPartitioner.java

package mr.access;

/*
 * MapReduce自定義分割槽規則
 */

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

//按照HashPartitioner.java中,需要繼承父類 //在這裡的key和value是mapper的輸出 //然後按alt+shift+enter實現方法 public class AccessPartitioner extends Partitioner<Text, Access> { //以下的方法進來的key是phone手機號 //access:是統計結果 //numPartitions:reduce的數量 @Override public int getPartition(Text text, Access access, int numPartitions) {
if (text.toString().startsWith("13")) { return 0; } else if (text.toString().startsWith("15")){ return 1; } else { return 2; } } }

3、AccessLocalApp.java

新增分割槽規則和分割槽個數

        //5、設定自定義分割槽規則
        job.setPartitionerClass(AccessPartitioner.class);
        //設定reduce個位
        job.setNumReduceTasks(3);

4、access.log

新增兩行資料

031    15161021755    192.168.126.104    200    45    50
032    16161021755    192.168.126.104    200    45    50