MR之partition自定義分割槽器
阿新 • • 發佈:2018-12-08
maptask執行的結果都會放到一個分割槽檔案中,這個分割槽檔案有自己的編號,這個編號是通過一個hash演算法來生成的,通過對context.write(k,v)中的k進行hash會產生一個值,相同的key產生的值是一樣的,所以這種辦法能將相同的key值放到一個分割槽中。分割槽中的值會發送給reducetask進行相應的處理。
mapreduce框架中有預設的分割槽器,這個分割槽器叫做HashPartitioner,程式碼程式碼如下:
這是預設分割槽的原始碼
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapred.lib; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.JobConf; /** * Partition keys by their {@link Object#hashCode()}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } 所以需要自己定義一個分割槽器,這個分割槽器通過判斷key的值返回相應的值。程式碼如下: public class ProvincePartition extends Partitioner<Text,ProvinceBean> { //載入資料字典 public static Map<String,Integer> provinceDict = new HashMap<String, Integer>(); static { provinceDict.put("135",0); provinceDict.put("181",1); provinceDict.put("177",2); provinceDict.put("170",3); } public int getPartition(Text key, ProvinceBean flowBean, int numPartitions) { Integer id = provinceDict.get(key.toString().substring(0,3)); System.err.println(key.toString().substring(0,3)); System.err.println(id); return id==null?4:id; } }
程式碼解釋:
1.首先需要將Text型別的值轉換成String型別,呼叫toString方法
2.切割手機號碼的前三位,通過get方法獲得key對應的value值,這個值也可以到資料庫中載入。
3.做一個判斷,判斷是否能得到值,能得值就直接返回map中的value值,得不到值就直接放在另外一個分割槽中。
注意:
如果 reduceTask 的數量> getPartition 的結果數,則會多產生幾個空的輸出檔案part-r-000xx;
如果 1<reduceTask 的數量<getPartition 的結果數,則有一部分分割槽資料無處安放,會Exception;
如果 reduceTask 的數量=1,則不管 mapTask 端輸出多少個分割槽檔案,最終結果都交給這一個 reduceTask,最終也就只會產生一個結果檔案 part-r-00000;
Driver中需要加入partition類的二進位制檔案
//設定自定義的分割槽類
job.setPartitionerClass(ProvincePartition.class);
//同時還需要設定reduce的個數,這個個數跟分割槽的個數相對應
job.setNumReduceTasks(5);