1. 程式人生 > >MapReduce 中的 Partitioner

MapReduce 中的 Partitioner

package flow.partitioner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import flow.pojo.Flow;

public class FlowPartitionerMR {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowPartitionerMR.class);
		
		job.setMapperClass(FlowPartitionerMRMapper.class);
		job.setMapOutputKeyClass(Flow.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setReducerClass(FlowPartitionerMRReducer.class);
		job.setOutputKeyClass(Flow.class);
		job.setOutputValueClass(NullWritable.class);
		
		/**
		 * 在MapReduce程式設計模型中,Partitioner的預設實現是HashPartitioner.如果
		 * 不能滿足我們的需求,按照HashPartitioner的實現方式自定義一個Partitioner元件即可
		 */
		job.setPartitionerClass(ProvincePartitioner.class);
	
//		job.setPartitionerClass(HashPartitioner.class); 
		
		
		job.setNumReduceTasks(9);
		
		Path inputPath = new Path(args[0]);
		Path outputPath = new Path(args[1]);
		FileInputFormat.setInputPaths(job, inputPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job,outputPath);
		
		boolean isDone = job.waitForCompletion(true);
		System.exit(isDone ? 0 : 1);
		
	}
	
	/**
	 * Mapper階段的業務邏輯
	 *
	 */
	private static class FlowPartitionerMRMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{
		Flow flow = new Flow();
		/**
		 * 13480253104	2494800	2494800	4989600
		 * 
		 */

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			String phone = split[0];
			long upFlow = Long.parseLong(split[1]);
			long downFlow = Long.parseLong(split[2]);
			long sumFlow = Long.parseLong(split[3]);
			
			/**
			 * map方法每呼叫一次,那麼就給flow物件中的對應屬性重新設定值,達到減少JVM垃圾回收的目的
			 */
			flow.setPhone(phone);
			flow.setUpFlow(upFlow);
			flow.setDownFlow(downFlow);
			flow.setSumFlow(sumFlow);
		
			/**
			 * 當前這個做法,是讓整個MapTask公用一個Flow物件。為什麼可以這麼用?
			 * 
			 * 序列化的工作機制
			 * 每次通過context.write(key,value)其實是把key和value當中的屬性值已經序列化了到其他的比如流或者記憶體或者磁碟檔案
			 * 
			 */
			context.write(flow, NullWritable.get());
			
		}
		
	}
	
	/**
	 * Reducer階段的業務邏輯
	 */
	private static class FlowPartitionerMRReducer extends Reducer<Flow, NullWritable, Flow, NullWritable>{

		@Override
		protected void reduce(Flow key, Iterable<NullWritable> values, Context context) 
				throws IOException, InterruptedException {

			/**
			 * 如果ReducerTask什麼邏輯都不用做,僅僅只是作原樣輸出:
			 * 
			 * 兩鍾實現方式:
			 * 
			 * 1、在當前的reduce方法中,直接遍歷原樣輸出
			 */
			for(NullWritable value : values){
				context.write(key, value);
			}
			
			/**
			 * 2、直接不用定義Reducer,直接使用預設實現
			 */

		}
		
		
		
	}
	
	

}

按照省分割槽:

package flow.partitioner;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

import flow.pojo.Flow;
/**
 * Partitioner的key-value的型別就是 Mapper元件的輸出的key-value的型別
 * 
 * Combiner的輸入key-value的型別也是Mapper元件的輸出key-value的型別
 * 
 * 假如Combiner的執行在Partitioner之前。
 * Partitioner的輸入的key-vlaue的型別就應該是Combiner的輸出key-value型別
 * 
 * Partitoner的執行時在Combiner之前。
 *
 */

public class ProvincePartitioner  extends Partitioner<Flow, NullWritable> {

	/**
	 * 作用:就是用來決定輸入的引數key-value到底應該進入到哪個ReducerTask
	 * 
	 * 當前自定義的Partitioner的邏輯:
	 * 	按照手機歸屬地的不同,把所有使用者的流量彙總資訊輸出到不同的結果檔案中
	 * 	如果要實現這個業務,必須依賴一張使用者的手機號和歸屬地的字典表
	 * 
	 * 	這種類似的字典表: 儲存在MySQL當中
	 * 	
	 * 	咱們做模擬實現
	 * 	假設: 134開頭的所有使用者都是北京的
	 * 		 135開頭的所有使用者都是上海的
	 * 		 ...
	 *
	 */
	@Override
	public int getPartition(Flow key, NullWritable value, int numPartitions) {
		
		return getProvinceNumber(key);
	}
	
	private static int getProvinceNumber(Flow key){
		String phone = key.getPhone();
		String phonePrefix = phone.substring(0, 3);
		
		if(phonePrefix.equals("134")){
			return 0;
		}else if (phonePrefix.equals("135")){
			return 1;
		}else if(phonePrefix.equals("136")){
			return 2;
		}else if(phonePrefix.equals("137")){
			return 3;
		}else if(phonePrefix.equals("138")){
			return 4;
		}else{
			return 5;
		}
	}
	
	

}

相關推薦

MapReduce的分割槽方法Partitioner

在進行MapReduce計算時,有時候需要把最終的輸出資料分到不同的檔案中,比如按照省份劃分的話,需要把同一個省份的資料放到一個檔案中,按照性別劃分的話,需要把同一個性別的資料放到一個檔案中.我們知道最終的輸出資料是來自Reducer任務的,那麼如果要得到多個檔案,意味著有同樣數的Reduc

【圖文詳細 】MapReduce Partitioner

需求:根據歸屬地輸出流量統計資料結果到不同檔案,以便於在查詢統計結果時可以定位到 省級範圍進行    思路:MapReduce 中會將 map 輸出的 kv 對,按照相同 key 分組,然後分發給不同的 reducetask 預設的分發規則為:根據 key 的 hashcod

MapReduceMapReduce的分割槽方法Partitioner

在進行MapReduce計算時,有時候需要把最終的輸出資料分到不同的檔案中,比如按照省份劃分的話,需要把同一省份的資料放到一個檔案中;按照性別劃分的話,需要把同一性別的資料放到一個檔案中。我們知道最終的輸出資料是來自於Reducer任務。那麼,如果要得到多個檔案,意味著有同樣數量的Reducer任務在執行。R

MapReduce Partitioner

package flow.partitioner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSyste

Hadoop Mapreduce Partitioner

alsa max one 輸入 階段 負載均衡。 均衡 total cal Partitioner的作用的對Mapper產生的中間結果進行分片,以便將同一分組的數據交給同一個Reduce處理,Partitioner直接影響Reduce階段的負載均衡。 MapReduce提供

mapreducereduce的叠代器只能調用一次!

new resultset row reducer style prot category nds 重復 親測,只能調用一次,如果想想在一次reduce重復使用叠代器中的數據,得先取出來放在list中然後在從list中取出來!!多次讀取reduce函數中叠代器的數據

MapReducecombine、partition、shuffle的作用是什麽

rgs 輸出 microsoft ted pop .com int ack 結果 http://www.aboutyun.com/thread-8927-1-1.html Mapreduce在hadoop中是一個比較難以的概念。以下須要用心

mapreducemap數的測試

1.5 nbsp 啟動 小時 修改 cor core mar 並行 默認的map數是有邏輯的split的數量決定的,根據源碼切片大小的計算公式:Math.max(minSize, Math.min(maxSize, blockSize)); 其中: minsize:默認值:

Hadoop學習之路(二十三)MapReduce的shuffle詳解

就是 多個 流程 http cer 分開 分享圖片 數據分區 bsp 概述 1、MapReduce 中,mapper 階段處理的數據如何傳遞給 reducer 階段,是 MapReduce 框架中 最關鍵的一個流程,這個流程就叫 Shuffle 2、Shuffle: 數

MapReduceshuffle過程

mapr 提前 bin run prope 內存 pat appdata 設置 shuffle是MapReduce的核心,map和reduce的中間過程。 Map負責過濾分發,reduce歸並整理,從map輸出到reduce輸入就是shuffle過程。 實現的功能 分區 決

mapreduce加入combiner

combine mage rim opened alt 不用 一次 apr configure combiner相當於是一個本地的reduce,它的存在是為了減少網絡的負擔,在本地先進行一次計算再叫計算結果提交給reduce進行二次處理。 現在的流程為: 對於comb

mapreducemap和reduce個數

case when 生成 task 輸入 slots align reducer 進行 很多 一、 控制hive任務中的map數: 1. 通常情況下,作業會通過input的目錄產生一個或者多個map任務。 主要的決定因素有: input的文件總個數,input的

[MapReduce_5] MapReduce 的 Combiner 元件應用

0. 說明   Combiner 介紹 &&  在 MapReduce 中的應用     1. 介紹   Combiner:   Map 端的 Reduce,有自己的使用場景   在相同 Key 過多的情況下,在 Map 端進行的預

【待完成】[MapReduce_7] MapReduce 的排序

  0. 說明         1. 介紹      sort 是根據 Key 進行排序     【部分排序】   在每個分割槽中,分別進行排序     【全排序】   在所有的分割槽

MapReduce如何控制reducer的數量

1,在預設情況下,一個mapreduce的job只有一個reducer;在大型叢集中,需要使用許多reducer,中間資料都會放到一個reducer中處理,如果reducer數量不夠,會成為計算瓶頸。 2,reducer的最優個數與叢集中可用的reducer的任務槽數相關,一般設定比總槽數稍微少

MapReduce如何控制mapper的數量

很多文件中描述,Mapper的數量在預設情況下不可直接控制干預,因為Mapper的數量由輸入的大小和個數決定。在預設情況下,最終input佔據了多少block,就應該啟動多少個Mapper。如果輸入的檔案數量巨大,但是每個檔案的size都小於HDFS的blockSize,那麼會造成啟動的Mapper

MapReduce的排序和計數器

一:條件準備 準備sort.txt文字 a 1 a 9 b 3 a 7 b 8 b 10 a 5 a 9 排序後輸出的文字: a 1 a 5 a 7 a 9 a 9 b 3 b 8 b 10 二:排序介面WritableCo

MapReduce 如何處理HBase的資料?如何讀取HBase資料給Map?如何將結果儲存到HBase

   MapReduce 中如何處理HBase中的資料?如何讀取HBase資料給Map?如何將結果儲存到HBase中? Mapper類:包括一個內部類(Context)和四個方法(setup,map,cleanup,run);     &n

MapReduce自定義分割槽

package tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartition extends Partitioner<

MapReduce自定義比較

package tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MySortComparator extends W