1. 程式人生 > >MapReduce二次排序原理和實現

MapReduce二次排序原理和實現

/**

* 自己定義的key類應該實現WritableComparable介面 */

public	class IntPair implements WritableComparable<IntPair>{ int first;//第一個成員變數 int second;//第二個成員變數

public void set(int left, int right){ first = left;

second = right;

}

public int getFirst(){ return first;
}

public int getSecond(){ return second;
}

@Override

//反序列化,從流中的二進位制轉換成IntPair public void readFields(DataInput in) throws IOException{

first = in.readInt(); second = in.readInt();
}

@Override

//序列化,將IntPair轉化成使用流傳送的二進位制 public void write(DataOutput out) throws IOException{

out.writeInt(first);

out.writeInt(second);

}

@Override //key的比較

public int compareTo(IntPair o)

{

// TODO Auto‐generated method stub if (first != o.first){

return first < o.first ? -1 : 1; }else if (second != o.second){

return second < o.second ? -1 : 1; }else{

return 0;

}

}

@Override

public int hashCode(){

return first * 157 + second;

}

@Override

public boolean equals(Object right){ if (right == null)

return false; if (this == right) return true;

if (right instanceof IntPair){ IntPair r = (IntPair) right;

return r.first == first && r.second == second; }else{

return false;

}

}

}

第二步:自定義分割槽函式類FirstPartitioner,根據 IntPair 中的first實現分割槽。


/**

* 分割槽函式類。根據first確定Partition。 */

public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{ @Override

public int getPartition(IntPair key, IntWritable value,int numPartitions){ return Math.abs(key.getFirst() * 127) % numPartitions;
}

}

第三步:自定義 SortComparator 實現 IntPair 類中的first和second排序。本課程中沒

有使用這種方法,而是使用 IntPair 中的compareTo()方法實現的。

第四步:自定義 GroupingComparator 類,實現分割槽內的資料分組。

/** *繼承WritableComparator */

public static class GroupingComparator extends WritableComparator{ protected GroupingComparator(){
super(IntPair.class, true);

}

@Override

//Compare two WritableComparables.

public int compare(WritableComparable w1, WritableComparable w2){ IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2; int l = ip1.getFirst(); int r = ip2.getFirst();

return l == r ? 0 : (l < r ? -1 : 1);

}

}

第五步:編寫 MapReduce 主程式實現二次排序。

	
import	java.io.DataInput;
import	java.io.DataOutput;
import	java.io.IOException;
import	java.util.StringTokenizer;
import	org.apache.hadoop.conf.Configuration;
import	org.apache.hadoop.fs.Path;
import	org.apache.hadoop.io.IntWritable;
import	org.apache.hadoop.io.LongWritable;
import	org.apache.hadoop.io.Text;
import	org.apache.hadoop.io.WritableComparable;
import	org.apache.hadoop.io.WritableComparator;
import	org.apache.hadoop.mapreduce.Job;
import	org.apache.hadoop.mapreduce.Mapper;
import	org.apache.hadoop.mapreduce.Partitioner;
import	org.apache.hadoop.mapreduce.Reducer;
import	org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import	org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import	org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import	org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public	class SecondarySort{
//	自定義map
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{
	private final IntPair intkey = new IntPair();
	private final IntWritable intvalue = new IntWritable();
	public void map(LongWritable key, Text value, Context context) throws IOException, Interrupted
	String line = value.toString();
	StringTokenizer tokenizer = new StringTokenizer(line);
	int left = 0;
	int right = 0;
	if (tokenizer.hasMoreTokens()){
	left = Integer.parseInt(tokenizer.nextToken());
	if (tokenizer.hasMoreTokens())
	right = Integer.parseInt(tokenizer.nextToken());
	intkey.set(left, right);
	intvalue.set(right);
	context.write(intkey, intvalue);
	}
	}
}	
//	自定義reduce

public static class Reduce extends Reducer< IntPair, IntWritable, Text, IntWritable>{ private final Text left = new Text();

public void reduce(IntPair key, Iterable< IntWritable> values,Context context) throws IOExcepti left.set(Integer.toString(key.getFirst()));
for (IntWritable val : values){ context.write(left, val);
}

}

}

/**

* @param args */
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundExce // TODO Auto‐generated method stub

Configuration conf = new Configuration();

Job job = new Job(conf, "secondarysort"); job.setJarByClass(SecondarySort.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));//輸入路徑

FileOutputFormat.setOutputPath(job, new Path(args[1]));//輸出路徑

job.setMapperClass(Map.class);// Mapper job.setReducerClass(Reduce.class);// Reducer

job.setPartitionerClass(FirstPartitioner.class);// 分割槽函式//job.setSortComparatorClass(KeyComparator.Class);//本課程並沒有自定義SortComparator,而是使用In job.setGroupingComparatorClass(GroupingComparator.class);// 分組函式




job.setMapOutputKeyClass(IntPair.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

MapReduce 的二次排序的原理和實現OK!


相關推薦

MapReduce排序原理實現

/** * 自己定義的key類應該實現WritableComparable介面 */ public class IntPair implements WritableComparable<IntPair>{ int first;//第一個成員變數 int second;//第二個成員變數 p

Hadoop MapReduce排序演算法與實現之演算法解析

MapReduce二次排序的原理     1.在Mapper階段,會通過inputFormat的getSplits來把資料集分割成split public abstract class Input

MapReduce排序

必須 .lib rec settime string == 技術分享 字段排序 protect 一、背景   按照年份升序排序,同時每一年中溫度降序排序   data文件為1949年-1955年每天的溫度數據。   要求:1、計算1949-1955年,每年溫度最高的時間  

關於MapReduce排序的一點解答

網上 hash 使用 table 為什麽 exti 而且 分區 ret 上一篇博客說明了怎麽自定義Key,而且用了二次排序的例子來做測試,但沒有詳細的說明二次排序,這一篇說詳細的說明二次排序,為了說明曾經一個思想的誤區,特地做了一個3個字段的二次排序來說明。後面稱其為“三次

一起學Hadoop——排序演算法的實現

二次排序,從字面上可以理解為在對key排序的基礎上對key所對應的值value排序,也叫輔助排序。一般情況下,MapReduce框架只對key排序,而不對key所對應的值排序,因此value的排序經常是不固定的。但是我們經常會遇到同時對key和value排序的需求,例如Hadoop權威指南中的求一年的高高氣溫

hadoop平臺使用python編寫mapreduce排序小程式

接上一個博文的環境 使用的是官網的專利使用資料,這裡只截取了一部分 3858241,956203 3858241,1324234 3858241,3398406 3858241,3557384 38

詳細講解MapReduce排序過程

我在15年處理大資料的時候還都是使用MapReduce, 隨著時間的推移, 計算工具的發展, 記憶體越來越便宜, 計算方式也有了極大的改變. 到現在再做大資料開發的好多同學都是直接使用spark, hive等工具, 很少有再寫MapReduce的了. 這裡整理一下MapReduce中經常用到的二次排序的方

Python Hadoop Mapreduce 實現Hadoop Streaming分組排序

需求:公司給到一份全國各門店銷售資料,要求:1.按門店市場分類,將同一市場的門店放到一起;2.將各家門店按銷售額從大到小,再按利潤從大到小排列 一 需求一:按市場對門店進行分組 分組(partition) Hadoop streaming框架預設情況下會以’/t

hadoop排序原理實現

預設情況下,Map輸出的結果會對Key進行預設的排序,但是有時候需要對Key排序的同時還需要對Value進行排序,這時候就要用到二次排序了。下面我們來說說二次排序 1、二次排序原理   我們把二次排序分為以下幾個階段   Map起始階段     在Map階段,使用jo

hadoop 排序一個java實現

需要二次排序的原因:mapreduce架構自動對對映器生成的鍵進行排序,即歸約器啟動之前,所有鍵是有序的,但是值是隨機的,二次排序指的是對值進行排序。歸約器輸入形如:,即一個key對應多個值,這些值是無序的,排序後得到有序的值,如下: 其中,S按照升序或者降序排列

結合案例講解MapReduce重要知識點 ------- 使用自定義MapReduce資料型別實現排序

自定義資料型別SSData import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableCompa

Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值最大最小值、資料清洗ETL、分析氣

Hadoop Mapreduce 演算法彙總  第52課:Hadoop鏈式MapReduce程式設計實戰...1 第51課:Hadoop MapReduce多維排序解析與實戰...2 第50課:HadoopMapReduce倒排索引解析與實戰...3 第49課:Hado

HadoopSpark分別實現排序

將下列資料中每個分割槽中的第一列順序排列,第二列倒序排列。 Text  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 2021 5051

分別使用HadoopSpark實現排序

零、序(注意本部分與標題無太大關係,可直接翻到第一部分)   既然沒用為啥會有序?原因不想再開一篇文章,來抒發點什麼感想或者計劃了,就在這裡寫點好了:   前些日子買了幾本書,打算學習和研究大資料方面的知識,一直因為實習、考試、畢業設計等問題搞得沒有時間,現在進入了寒

MapReduce程序之排序與多排序

大數據 Hadoop MapReduce Java [toc] MapReduce程序之二次排序與多次排序 需求 有下面的數據: cookieId time url 2 12:12:34 2_hao123 3 09:10:34 3_baidu 1 15:0

mapreduce排序

大數據 hadoop 二次排序 mapreduce 一: 理解二次排序的功能, 使用自己理解的方式表達(包括自定義數據類型,分區,分組,排序) 二: 編寫實現二次排序功能, 提供源碼文件。 三:理解mapreduce join 的幾種 方式,編碼實現reduce join,提供源代碼,說出

大數據技術之輔助排序排序案例(GroupingComparator)

group http pac ppr instance div lec tex boolean 大數據技術之輔助排序和二次排序案例(GroupingComparator) 1)需求 有如下訂單數據 訂單id 商品id 成交金額

MapReduce排序

  這裡介紹二次排序的思路整理,並附上具體程式碼     首先要明確二次排序的基本概念:在我們所之前所熟悉的排序稱為一次排序,即只對key進行排序       所以二次排序的概念在原來的基礎上便不難理解,即對key進行排序的同時對

43.top10熱門品類之使用Scala實現排序

本文為《Spark大型電商專案實戰》 系列文章之一,主要介紹使用Scala實現二次排序。 程式碼實現 在Scala IDE中的包com.erik.sparkproject中建立SortKey.sca

七、Sketchup用ruby進行開發--利用Transformation實現Move工具(平移、旋轉縮放)

 在Sketchup中,move工具使用的非常廣泛,,可以移動、拉伸和複製幾何體,也可以用來旋轉元件。舉一個簡單地例子。 我們要做一個建築物的尖頂,如下圖所示,就是使用move工具實現的。                   接下來我們就要學習如何使用ruby實現這樣的功能