MapReduce二次排序原理和實現
/** * 自己定義的key類應該實現WritableComparable介面 */ public class IntPair implements WritableComparable<IntPair>{ int first;//第一個成員變數 int second;//第二個成員變數 public void set(int left, int right){ first = left; second = right; } public int getFirst(){ return first; } public int getSecond(){ return second; } @Override //反序列化,從流中的二進位制轉換成IntPair public void readFields(DataInput in) throws IOException{ first = in.readInt(); second = in.readInt(); } @Override //序列化,將IntPair轉化成使用流傳送的二進位制 public void write(DataOutput out) throws IOException{ out.writeInt(first); out.writeInt(second); } @Override //key的比較 public int compareTo(IntPair o) { // TODO Auto‐generated method stub if (first != o.first){ return first < o.first ? -1 : 1; }else if (second != o.second){ return second < o.second ? -1 : 1; }else{ return 0; } } @Override public int hashCode(){ return first * 157 + second; } @Override public boolean equals(Object right){ if (right == null) return false; if (this == right) return true; if (right instanceof IntPair){ IntPair r = (IntPair) right; return r.first == first && r.second == second; }else{ return false; } } } 第二步:自定義分割槽函式類FirstPartitioner,根據 IntPair 中的first實現分割槽。 /** * 分割槽函式類。根據first確定Partition。 */ public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value,int numPartitions){ return Math.abs(key.getFirst() * 127) % numPartitions; } } 第三步:自定義 SortComparator 實現 IntPair 類中的first和second排序。本課程中沒 有使用這種方法,而是使用 IntPair 中的compareTo()方法實現的。 第四步:自定義 GroupingComparator 類,實現分割槽內的資料分組。 /** *繼承WritableComparator */ public static class GroupingComparator extends WritableComparator{ protected GroupingComparator(){ super(IntPair.class, true); } @Override //Compare two WritableComparables. public int compare(WritableComparable w1, WritableComparable w2){ IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int l = ip1.getFirst(); int r = ip2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } } 第五步:編寫 MapReduce 主程式實現二次排序。 import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class SecondarySort{ // 自定義map public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{ private final IntPair intkey = new IntPair(); private final IntWritable intvalue = new IntWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, Interrupted String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); int left = 0; int right = 0; if (tokenizer.hasMoreTokens()){ left = Integer.parseInt(tokenizer.nextToken()); if (tokenizer.hasMoreTokens()) right = Integer.parseInt(tokenizer.nextToken()); intkey.set(left, right); intvalue.set(right); context.write(intkey, intvalue); } } } // 自定義reduce public static class Reduce extends Reducer< IntPair, IntWritable, Text, IntWritable>{ private final Text left = new Text(); public void reduce(IntPair key, Iterable< IntWritable> values,Context context) throws IOExcepti left.set(Integer.toString(key.getFirst())); for (IntWritable val : values){ context.write(left, val); } } } /** * @param args */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundExce // TODO Auto‐generated method stub Configuration conf = new Configuration(); Job job = new Job(conf, "secondarysort"); job.setJarByClass(SecondarySort.class); FileInputFormat.setInputPaths(job, new Path(args[0]));//輸入路徑 FileOutputFormat.setOutputPath(job, new Path(args[1]));//輸出路徑 job.setMapperClass(Map.class);// Mapper job.setReducerClass(Reduce.class);// Reducer job.setPartitionerClass(FirstPartitioner.class);// 分割槽函式//job.setSortComparatorClass(KeyComparator.Class);//本課程並沒有自定義SortComparator,而是使用In job.setGroupingComparatorClass(GroupingComparator.class);// 分組函式 job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapReduce 的二次排序的原理和實現OK!
相關推薦
MapReduce二次排序原理和實現
/** * 自己定義的key類應該實現WritableComparable介面 */ public class IntPair implements WritableComparable<IntPair>{ int first;//第一個成員變數 int second;//第二個成員變數 p
Hadoop MapReduce二次排序演算法與實現之演算法解析
MapReduce二次排序的原理 1.在Mapper階段,會通過inputFormat的getSplits來把資料集分割成split public abstract class Input
MapReduce二次排序
必須 .lib rec settime string == 技術分享 字段排序 protect 一、背景 按照年份升序排序,同時每一年中溫度降序排序 data文件為1949年-1955年每天的溫度數據。 要求:1、計算1949-1955年,每年溫度最高的時間
關於MapReduce二次排序的一點解答
網上 hash 使用 table 為什麽 exti 而且 分區 ret 上一篇博客說明了怎麽自定義Key,而且用了二次排序的例子來做測試,但沒有詳細的說明二次排序,這一篇說詳細的說明二次排序,為了說明曾經一個思想的誤區,特地做了一個3個字段的二次排序來說明。後面稱其為“三次
一起學Hadoop——二次排序演算法的實現
二次排序,從字面上可以理解為在對key排序的基礎上對key所對應的值value排序,也叫輔助排序。一般情況下,MapReduce框架只對key排序,而不對key所對應的值排序,因此value的排序經常是不固定的。但是我們經常會遇到同時對key和value排序的需求,例如Hadoop權威指南中的求一年的高高氣溫
hadoop平臺使用python編寫mapreduce二次排序小程式
接上一個博文的環境 使用的是官網的專利使用資料,這裡只截取了一部分 3858241,956203 3858241,1324234 3858241,3398406 3858241,3557384 38
詳細講解MapReduce二次排序過程
我在15年處理大資料的時候還都是使用MapReduce, 隨著時間的推移, 計算工具的發展, 記憶體越來越便宜, 計算方式也有了極大的改變. 到現在再做大資料開發的好多同學都是直接使用spark, hive等工具, 很少有再寫MapReduce的了. 這裡整理一下MapReduce中經常用到的二次排序的方
Python Hadoop Mapreduce 實現Hadoop Streaming分組和二次排序
需求:公司給到一份全國各門店銷售資料,要求:1.按門店市場分類,將同一市場的門店放到一起;2.將各家門店按銷售額從大到小,再按利潤從大到小排列 一 需求一:按市場對門店進行分組 分組(partition) Hadoop streaming框架預設情況下會以’/t
hadoop二次排序的原理和實現
預設情況下,Map輸出的結果會對Key進行預設的排序,但是有時候需要對Key排序的同時還需要對Value進行排序,這時候就要用到二次排序了。下面我們來說說二次排序 1、二次排序原理 我們把二次排序分為以下幾個階段 Map起始階段 在Map階段,使用jo
hadoop 二次排序和一個java實現
需要二次排序的原因:mapreduce架構自動對對映器生成的鍵進行排序,即歸約器啟動之前,所有鍵是有序的,但是值是隨機的,二次排序指的是對值進行排序。歸約器輸入形如:,即一個key對應多個值,這些值是無序的,排序後得到有序的值,如下: 其中,S按照升序或者降序排列
結合案例講解MapReduce重要知識點 ------- 使用自定義MapReduce資料型別實現二次排序
自定義資料型別SSData import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableCompa
Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、二次排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值和最大最小值、資料清洗ETL、分析氣
Hadoop Mapreduce 演算法彙總 第52課:Hadoop鏈式MapReduce程式設計實戰...1 第51課:Hadoop MapReduce多維排序解析與實戰...2 第50課:HadoopMapReduce倒排索引解析與實戰...3 第49課:Hado
Hadoop和Spark分別實現二次排序
將下列資料中每個分割槽中的第一列順序排列,第二列倒序排列。 Text 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 2021 5051
分別使用Hadoop和Spark實現二次排序
零、序(注意本部分與標題無太大關係,可直接翻到第一部分) 既然沒用為啥會有序?原因不想再開一篇文章,來抒發點什麼感想或者計劃了,就在這裡寫點好了: 前些日子買了幾本書,打算學習和研究大資料方面的知識,一直因為實習、考試、畢業設計等問題搞得沒有時間,現在進入了寒
MapReduce程序之二次排序與多次排序
大數據 Hadoop MapReduce Java [toc] MapReduce程序之二次排序與多次排序 需求 有下面的數據: cookieId time url 2 12:12:34 2_hao123 3 09:10:34 3_baidu 1 15:0
mapreduce 的二次排序
大數據 hadoop 二次排序 mapreduce 一: 理解二次排序的功能, 使用自己理解的方式表達(包括自定義數據類型,分區,分組,排序) 二: 編寫實現二次排序功能, 提供源碼文件。 三:理解mapreduce join 的幾種 方式,編碼實現reduce join,提供源代碼,說出
大數據技術之輔助排序和二次排序案例(GroupingComparator)
group http pac ppr instance div lec tex boolean 大數據技術之輔助排序和二次排序案例(GroupingComparator) 1)需求 有如下訂單數據 訂單id 商品id 成交金額
MapReduce的二次排序
這裡介紹二次排序的思路整理,並附上具體程式碼 首先要明確二次排序的基本概念:在我們所之前所熟悉的排序稱為一次排序,即只對key進行排序 所以二次排序的概念在原來的基礎上便不難理解,即對key進行排序的同時對
43.top10熱門品類之使用Scala實現二次排序
本文為《Spark大型電商專案實戰》 系列文章之一,主要介紹使用Scala實現二次排序。 程式碼實現 在Scala IDE中的包com.erik.sparkproject中建立SortKey.sca
七、Sketchup用ruby進行二次開發--利用Transformation實現Move工具(平移、旋轉和縮放)
在Sketchup中,move工具使用的非常廣泛,,可以移動、拉伸和複製幾何體,也可以用來旋轉元件。舉一個簡單地例子。 我們要做一個建築物的尖頂,如下圖所示,就是使用move工具實現的。 接下來我們就要學習如何使用ruby實現這樣的功能