Hadoop日記Day18---MapReduce排序分組
MapReduce的排序分組任務與要求
我們知道排序分組是MapReduce中Mapper端的第四步,其中分組排序都是基於Key的,我們可以通過下面這幾個例子來體現出來。其中的資料和任務如下圖1.1,1.2所示。
#首先按照第一列升序排列,當第一列相同時,第二列升序排列 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #結果 1 1 2 1 2 2 3 1 3 2 3 3
圖 1.1 排序
#當第一列相同時,求出第二列的最小值 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #結果3 1 2 1 1 1
圖 1.2 分組
一、 排序演算法
1.1 MapReduce預設排序演算法
使用MapReduce預設排序演算法程式碼如下1.1所示,在程式碼中我將第一列作為鍵,第二列作為值。
1 package sort; 2 3 import java.io.IOException; 4 import java.net.URI; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FileStatus;View Code8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer;15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 20 21 public class SortApp { 22 private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 23 private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 24 public static void main(String[] args) throws Exception { 25 Configuration conf=new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outpath = new Path(OUT_PATH); 28 if(fileSystem.exists(outpath)){ 29 fileSystem.delete(outpath,true); 30 } 31 32 final Job job = new Job(conf,SortApp.class.getSimpleName()); 33 34 //1.1 指定輸入檔案路徑 35 FileInputFormat.setInputPaths(job, INPUT_PATH); 36 job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入檔案 37 38 //1.2指定自定義的Mapper類 39 job.setMapperClass(MyMapper.class); 40 job.setMapOutputKeyClass(LongWritable.class);//指定輸出<k2,v2>的型別 41 job.setMapOutputValueClass(LongWritable.class); 42 43 //1.3 指定分割槽類 44 job.setPartitionerClass(HashPartitioner.class); 45 job.setNumReduceTasks(1); 46 47 //1.4 TODO 排序、分割槽 48 49 //1.5 TODO (可選)合併 50 51 //2.2 指定自定義的reduce類 52 job.setReducerClass(MyReducer.class); 53 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的型別 54 job.setOutputValueClass(LongWritable.class); 55 56 //2.3 指定輸出到哪裡 57 FileOutputFormat.setOutputPath(job, outpath); 58 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出檔案的格式化類 59 job.waitForCompletion(true);//把程式碼提交給JobTracker執行 60 } 61 static class MyMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable>{ 62 63 @Override 64 protected void map( 65 LongWritable key, 66 Text value, 67 Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) 68 throws IOException, InterruptedException { 69 final String[] splited = value.toString().split("\t"); 70 final long k2 = Long.parseLong(splited[0]); 71 final long v2 = Long.parseLong(splited[1]); 72 context.write(new LongWritable(k2),new LongWritable(v2)); 73 } 74 } 75 static class MyReducer extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>{ 76 77 @Override 78 protected void reduce( 79 LongWritable k2, 80 Iterable<LongWritable> v2s, 81 Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) 82 throws IOException, InterruptedException { 83 for(LongWritable v2:v2s){ 84 context.write(k2, v2); 85 } 86 } 87 } 88 }
程式碼 1.1
執行結果如下圖1.3所示
1 1 2 2 2 1 3 3 3 2 3 1
圖 1.3
從上面圖中執行結果可以看出,MapReduce預設排序演算法只對Key進行了排序,並沒有對value進行排序,沒有達到我們的要求,所以要實現我們的要求,還要我們自定義一個排序演算法
1.2 自定義排序演算法
從上面圖中執行結果可以知道,MapReduce預設排序演算法只對Key進行了排序,並沒有對value進行排序,沒有達到我們的要求,所以要實現我們的要求,還要我們自定義一個排序演算法。在map和reduce階段進行排序時,比較的是k2。v2是不參與排序比較的。如果要想讓v2也進行排序,需要把k2和v2組裝成新的類作為k 2 ,才能參與比較。所以在這裡我們新建一個新的型別NewK2型別來封裝原來的k2和v2。程式碼如1.2所示。
1 package sort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.io.WritableComparable; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.Mapper; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 22 23 public class SortApp { 24 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 25 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 26 public static void main(String[] args) throws Exception{ 27 final Configuration configuration = new Configuration(); 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); 29 if(fileSystem.exists(new Path(OUT_PATH))){ 30 fileSystem.delete(new Path(OUT_PATH), true); 31 } 32 final Job job = new Job(configuration, SortApp.class.getSimpleName()); 33 //1.1 指定輸入檔案路徑 34 FileInputFormat.setInputPaths(job, INPUT_PATH); 35 job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入檔案 36 37 //1.2指定自定義的Mapper類 38 job.setMapperClass(MyMapper.class); 39 job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的型別 40 job.setMapOutputValueClass(LongWritable.class); 41 42 //1.3 指定分割槽類 43 job.setPartitionerClass(HashPartitioner.class); 44 job.setNumReduceTasks(1); 45 46 //2.2 指定自定義的reduce類 47 job.setReducerClass(MyReducer.class); 48 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的型別 49 job.setOutputValueClass(LongWritable.class); 50 51 //2.3 指定輸出到哪裡 52 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 53 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出檔案的格式化類 54 job.waitForCompletion(true);//把程式碼提交給JobTracker執行 55 } 56 57 58 static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ 59 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 60 final String[] splited = value.toString().split("\t"); 61 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); 62 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); 63 context.write(k2, v2); 64 }; 65 } 66 67 static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ 68 protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 69 context.write(new LongWritable(k2.first), new LongWritable(k2.second)); 70 }; 71 } 72 73 /** 74 * 問:為什麼實現該類? 75 * 答:因為原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,作為新的k2 76 * 77 */ 78 static class NewK2 implements WritableComparable<NewK2>{ 79 Long first; 80 Long second; 81 82 public NewK2(){} 83 84 public NewK2(long first, long second){ 85 this.first = first; 86 this.second = second; 87 } 88 89 90 @Override 91 public void readFields(DataInput in) throws IOException { 92 this.first = in.readLong(); 93 this.second = in.readLong(); 94 } 95 96 @Override 97 public void write(DataOutput out) throws IOException { 98 out.writeLong(first); 99 out.writeLong(second); 100 } 101 102 /** 103 * 當k2進行排序時,會呼叫該方法. 104 * 當第一列不同時,升序;當第一列相同時,第二列升序 105 */ 106 @Override 107 public int compareTo(NewK2 o) { 108 final long minus = this.first - o.first; 109 if(minus !=0){ 110 return (int)minus; 111 } 112 return (int)(this.second - o.second); 113 } 114 115 @Override 116 public int hashCode() { 117 return this.first.hashCode()+this.second.hashCode(); 118 } 119 120 @Override 121 public boolean equals(Object obj) { 122 if(!(obj instanceof NewK2)){ 123 return false; 124 } 125 NewK2 oK2 = (NewK2)obj; 126 return (this.first==oK2.first)&&(this.second==oK2.second); 127 } 128 } 129 130 }View Code
程式碼 1.2
從上面的程式碼中我們可以發現,我們的新型別NewK2實現了WritableComparable介面,其中該介面中有一個compareTo()方法,當對關鍵字進行比較會呼叫該方法,而我們就在該方法中實現了我們想要做的事。
執行結果如下圖1.4所示。
1 1 2 1 2 2 3 1 3 2 3 3
圖 1.4
二、分組演算法
2.1 MapReduce預設分組
分組是在MapReduce中Mapper端的第四步,分組也是基於Key進行的,將相同key的value放到一個集合中去。還以上面排序程式碼為例,業務邏輯如下圖2.1所示。在程式碼中以NewK2為關鍵字,每個鍵都不相同,所以會將資料分為六組,這樣就不能實現我們的業務要求,但利用自定義型別NewK2,可以自定義排序演算法的同時我們也可以自定義分組演算法。
#當第一列相同時,求出第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
-------------------
#結果
3 1
2 1
1 1
圖 2.1
2.2 自定義分組比較器
由於業務要求分組是按照第一列分組,但是NewK2的比較規則決定了不能按照第一列分,只能自定義分組比較器,程式碼如下2.1所示。
1 package group; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.RawComparator; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.io.WritableComparable; 15 import org.apache.hadoop.io.WritableComparator; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 23 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 24 25 public class GroupApp { 26 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 27 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 28 public static void main(String[] args) throws Exception{ 29 final Configuration configuration = new Configuration(); 30 31 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); 32 if(fileSystem.exists(new Path(OUT_PATH))){ 33 fileSystem.delete(new Path(OUT_PATH), true); 34 } 35 final Job job = new Job(configuration, GroupApp.class.getSimpleName()); 36 37 //1.1 指定輸入檔案路徑 38 FileInputFormat.setInputPaths(job, INPUT_PATH); 39 job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入檔案 40 41 //1.2指定自定義的Mapper類 42 job.setMapperClass(MyMapper.class); 43 job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的型別 44 job.setMapOutputValueClass(LongWritable.class); 45 46 //1.3 指定分割槽類 47 job.setPartitionerClass(HashPartitioner.class); 48 job.setNumReduceTasks(1); 49 50 //1.4 TODO 排序、分割槽 51 job.setGroupingComparatorClass(MyGroupingComparator.class); 52 //1.5 TODO (可選)合併 53 54 //2.2 指定自定義的reduce類 55 job.setReducerClass(MyReducer.class); 56 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的型別 57 job.setOutputValueClass(LongWritable.class); 58 59 //2.3 指定輸出到哪裡 60 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 61 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出檔案的格式化類 62 job.waitForCompletion(true);//把程式碼提交給JobTracker執行 63 } 64 65 66 static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ 67 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 68 final String[] splited = value.toString().split("\t"); 69 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); 70 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); 71 context.write(k2, v2); 72 }; 73 } 74 75 static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ 76 protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 77 long min = Long.MAX_VALUE; 78 for (LongWritable v2 : v2s) { 79 if(v2.get()<min){ 80 min = v2.get(); 81 } 82 } 83 84 context.write(new LongWritable(k2.first), new LongWritable(min)); 85 }; 86 } 87 88 /** 89 * 問:為什麼實現該類? 90 * 答:因為原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,作為新的k2 91 * 92 */ 93 static class NewK2 implements WritableComparable<NewK2>{ 94 Long first; 95 Long second; 96 97 public NewK2(){} 98 99 public NewK2(long first, long second){ 100 this.first = first; 101 this.second = second; 102 } 103 104 105 @Override 106 public void readFields(DataInput in) throws IOException { 107 this.first = in.readLong(); 108 this.second = in.readLong(); 109 } 110 111 @Override 112 public void write(DataOutput out) throws IOException { 113 out.writeLong(first); 114 out.writeLong(second); 115 } 116 117 /** 118 * 當k2進行排序時,會呼叫該方法. 119 * 當第一列不同時,升序;當第一列相同時,第二列升序 120 */ 121 @Override 122 public int compareTo(NewK2 o) { 123 final long minus = this.first - o.first; 124 if(minus !=0){ 125 return (int)minus; 126 } 127 return (int)(this.second - o.second); 128 } 129 130 @Override 131 public int hashCode() { 132 return this.first.hashCode()+this.second.hashCode(); 133 } 134 135 @Override 136 public boolean equals(Object obj) { 137 if(!(obj instanceof NewK2)){ 138 return false; 139 } 140 NewK2 oK2 = (NewK2)obj; 141 return (this.first==oK2.first)&&(this.second==oK2.second); 142 } 143 } 144 145 static class MyGroupingComparator implements RawComparator<NewK2>{ 146 147 @Override 148 public int compare(NewK2 o1, NewK2 o2) { 149 return (int)(o1.first - o2.first); 150 } 151 152 @Override 153 public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, 154 int arg4, int arg5) { 155 return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8); 156 } 157 158 } 159 }View Code
程式碼2.1
從上面的程式碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator介面,RawComparator又繼承了Comparator介面,這兩個介面的程式碼如下:
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }
在類MyGroupingComparator中分別對著兩個介面中的方法進行了實現,RawComparator中的compare()方法是基於位元組的比較,Comparator中的compare()方法是基於物件的比較。在該方法一共有六個引數,如下:
* @param arg0 表示第一個參與比較的位元組陣列
* @param arg1 表示第一個參與比較的位元組陣列的起始位置
* @param arg2 表示第一個參與比較的位元組陣列的偏移量
*
* @param arg3 表示第二個參與比較的位元組陣列
* @param arg4 表示第二個參與比較的位元組陣列的起始位置
* @param arg5 表示第二個參與比較的位元組陣列的偏移量
在於NewK2中儲存著兩個long型別,每個long型別為8位元組,.compareBytes()方法的引數如下:.compareBytes(arg0, arg1, 8, arg3, arg4, 8);因為比較的是第一列,所以讀取的偏移量為8位元組。由於我們要求出每一分組的最小值,所以還重寫Reduce方法,求出每一分租的最小值。最後的執行結果如下圖2.1所示
1 1 2 1 3 1
圖 2.1
相關推薦
Hadoop日記Day18---MapReduce排序分組
MapReduce的排序分組任務與要求
我們知道排序分組是MapReduce中Mapper端的第四步,其中分組排序都是基於Key的,我們可以通過下面這幾個例子來體現出來。其中的資料和任務如下圖1.1,1.2所示。
#首先按照第一列升序排列,當第一列相同時,第二列升序排列
3 3
3
大數據學習之九——Combiner,Partitioner,shuffle和MapReduce排序分組
pareto 聚合 文件 ner 數據傳輸 定義排序 str ack 獲取數據 1.Combiner
Combiner是MapReduce的一種優化手段。每一個map都可能會產生大量的本地輸出,Combiner的作用就是對map端的輸出先做一次合並,以減少map和reduc
hadoop-之二次排序&分組&分割槽
package p5.gyg.two.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org
大資料-Hadoop生態(18)-MapReduce框架原理-WritableComparable排序和GroupingComparator分組
1.排序概述
2.排序分類
3.WritableComparable案例
這個檔案,是大資料-Hadoop生態(12)-Hadoop序列化和原始碼追蹤的輸出檔案,可以看到,檔案根據key,也就是手機號進行了字典排序
13470253144 180 180
hadoop用java API實現mapreduce排序
mapreduce排序依靠的是key鍵,所以要在輸出的key對應的類實現compareTo()方法
#key對應的類
package org.hadoop.sort;
import org.apache.hadoop.io.Writable;
import org.apache
hadoop平臺使用python編寫mapreduce排序小程式
編寫環境
hadoop-2.6.5
python-2.7.5
xshell連線
金山雲平臺,一臺master,3臺selvet
資料型別
g 445
a 1117
b 222
c 333
d 444
e 123
f 345
h 456
Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、二次排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值和最大最小值、資料清洗ETL、分析氣
Hadoop Mapreduce
演算法彙總
第52課:Hadoop鏈式MapReduce程式設計實戰...1
第51課:Hadoop MapReduce多維排序解析與實戰...2
第50課:HadoopMapReduce倒排索引解析與實戰...3
第49課:Hado
hadoop系列三:mapreduce的使用
count 明顯 blank api park size 當前 java mapreduce 轉載請在頁首明顯處註明作者與出處
一:說明
此為大數據系列的一些博文,有空的話會陸續更新,包含大數據的一些內容,如hadoop,spark,storm,機器學習等。
hadoop入門筆記MapReduce Shuffle簡介(五)
單位 海量數據 並行處理 詳細 但是 信息 不能 utf 適合 1. MapReduce 定義
Hadoop 中的 MapReduce是一個使用簡單的軟件框架,基於它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,並以一種可靠容錯式並行處理TB級別的數據集
hadoop入門筆記MapReduce簡介(三)
today 信息 編程模型 cut 大型 狀態 參數 dfs 好處 . MapReduce基本編程模型和框架
1.1 MapReduce抽象模型
大數據計算的核心思想是:分而治之。如下圖1所示。把大量的數據劃分開來,分配給各個子任務來完成。再將結果合並到一起輸出。 註:如果
[轉]hadoop運行mapreduce作業無法連接0.0.0.0/0.0.0.0:10020
temp ide 屬性 增加 mes 時間 kerberos 動態更新 lap 14/04/04 17:15:12 INFO mapreduce.Job: map 0% reduce 0%
14/04/04 17:19:42 INFO mapreduce.Job: ma
[Hadoop]淺談MapReduce原理及執行流程
技術分享 情況下 size 原來 per node 有一個 根據 執行流程 MapReduce
MapReduce原理非常重要,hive與spark都是基於MR原理
MapReduce采用多進程,方便對每個任務資源控制和調配,但是進程消耗更多的啟動時間,因此MR時效
大資料篇:hadoop測試WordCount mapreduce出錯問題
[[email protected] ~]# hadoop jar /usr/local/hadoop-2.8.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.4.jar wordcount /data/wordcount /o
hadoop自定義實現排序流量統計
https://blog.csdn.net/wzcwmhp/article/details/53285581
首先map會按照key的預設字典排序規則對其輸出進行排序,如果我們想實現流量輸出排序,可以將其flowbean設定為key,然後通過compareable介面自定義排序規則對fl
Linux系統下安裝Hadoop並測試MapReduce
首先我們需要在網上下載Linux系統,我在這次安裝過程中使用的CentOS-7系統
CentOS-7映象檔案下載地址: https://www.centos.org/download/
1. 選擇好自己需要的版本
2. 安裝CentOS系統,並安裝
MapReduce實現分組求TopN
本文以訂單案例為例,演示如何進行分組,以及求取每組的前兩條資料。
一:案例需求
有如下訂單資料
訂單id
商品id
成交金額
Order_0000001
Pdt_01
222.
win7 系統eclipse環境下測試 執行hadoop 的 wordcount mapreduce。
上篇介紹了在linux下測試執行 hadoop 的wordcount 例子後,就想著怎麼在eclipse 下編寫mapreduce函式,連結hadoop叢集計算呢。 linux下測試執行 hadoop 的wordcount 參考:https://mp.csdn.net/mdeditor/
Hadoop面試準備——MapReduce
1、MR執行流程
作業的提交
1)啟動客戶端Client,執行Job;
2)客戶端向資源管理器(ResourceManager)提交任務,請求一個新的ID號;
3)客戶端將Job所需的資源傳送給HDFS;
4)客戶端向RM提交作業;
作業的初始化
5)RM將作業請求傳送給
hadoop streaming欄位排序介紹
我們在使用hadoop streaming的時候預設streaming的map和reduce的separator不指定的話,map和reduce會根據它們預設的分隔符來進行排序
map:預設的分隔符是\t
reduce:預設的分隔符是" "
得到的結果都是按第一個分隔符排序去重後的結果
大資料-Hadoop生態(13)-MapReduce框架原理--Job提交原始碼和切片原始碼解析
1.MapReduce的資料流
1) Input -> Mapper階段
輸入源是一個檔案,經過InputFormat之後,到了Mapper就成了K,V對,以上一章的流量案例來說,經過InputFormat之後,變成了手機號為key,這一行資料為value的K,V對,所以這裡我們可以自定義Inp
Hadoop日記Day18---MapReduce排序分組
MapReduce的排序分組任務與要求 我們知道排序分組是MapReduce中Mapper端的第四步,其中分組排序都是基於Key的,我們可以通過下面這幾個例子來體現出來。其中的資料和任務如下圖1.1,1.2所示。 #首先按照第一列升序排列,當第一列相同時,第二列升序排列 3 3 3
大數據學習之九——Combiner,Partitioner,shuffle和MapReduce排序分組
pareto 聚合 文件 ner 數據傳輸 定義排序 str ack 獲取數據 1.Combiner Combiner是MapReduce的一種優化手段。每一個map都可能會產生大量的本地輸出,Combiner的作用就是對map端的輸出先做一次合並,以減少map和reduc
hadoop-之二次排序&分組&分割槽
package p5.gyg.two.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org
大資料-Hadoop生態(18)-MapReduce框架原理-WritableComparable排序和GroupingComparator分組
1.排序概述 2.排序分類 3.WritableComparable案例 這個檔案,是大資料-Hadoop生態(12)-Hadoop序列化和原始碼追蹤的輸出檔案,可以看到,檔案根據key,也就是手機號進行了字典排序 13470253144 180 180
hadoop用java API實現mapreduce排序
mapreduce排序依靠的是key鍵,所以要在輸出的key對應的類實現compareTo()方法 #key對應的類 package org.hadoop.sort; import org.apache.hadoop.io.Writable; import org.apache
hadoop平臺使用python編寫mapreduce排序小程式
編寫環境 hadoop-2.6.5 python-2.7.5 xshell連線 金山雲平臺,一臺master,3臺selvet 資料型別 g 445 a 1117 b 222 c 333 d 444 e 123 f 345 h 456
Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、二次排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值和最大最小值、資料清洗ETL、分析氣
Hadoop Mapreduce 演算法彙總 第52課:Hadoop鏈式MapReduce程式設計實戰...1 第51課:Hadoop MapReduce多維排序解析與實戰...2 第50課:HadoopMapReduce倒排索引解析與實戰...3 第49課:Hado
hadoop系列三:mapreduce的使用
count 明顯 blank api park size 當前 java mapreduce 轉載請在頁首明顯處註明作者與出處 一:說明 此為大數據系列的一些博文,有空的話會陸續更新,包含大數據的一些內容,如hadoop,spark,storm,機器學習等。
hadoop入門筆記MapReduce Shuffle簡介(五)
單位 海量數據 並行處理 詳細 但是 信息 不能 utf 適合 1. MapReduce 定義 Hadoop 中的 MapReduce是一個使用簡單的軟件框架,基於它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,並以一種可靠容錯式並行處理TB級別的數據集
hadoop入門筆記MapReduce簡介(三)
today 信息 編程模型 cut 大型 狀態 參數 dfs 好處 . MapReduce基本編程模型和框架 1.1 MapReduce抽象模型 大數據計算的核心思想是:分而治之。如下圖1所示。把大量的數據劃分開來,分配給各個子任務來完成。再將結果合並到一起輸出。 註:如果
[轉]hadoop運行mapreduce作業無法連接0.0.0.0/0.0.0.0:10020
temp ide 屬性 增加 mes 時間 kerberos 動態更新 lap 14/04/04 17:15:12 INFO mapreduce.Job: map 0% reduce 0% 14/04/04 17:19:42 INFO mapreduce.Job: ma
[Hadoop]淺談MapReduce原理及執行流程
技術分享 情況下 size 原來 per node 有一個 根據 執行流程 MapReduce MapReduce原理非常重要,hive與spark都是基於MR原理 MapReduce采用多進程,方便對每個任務資源控制和調配,但是進程消耗更多的啟動時間,因此MR時效
大資料篇:hadoop測試WordCount mapreduce出錯問題
[[email protected] ~]# hadoop jar /usr/local/hadoop-2.8.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.4.jar wordcount /data/wordcount /o
hadoop自定義實現排序流量統計
https://blog.csdn.net/wzcwmhp/article/details/53285581 首先map會按照key的預設字典排序規則對其輸出進行排序,如果我們想實現流量輸出排序,可以將其flowbean設定為key,然後通過compareable介面自定義排序規則對fl
Linux系統下安裝Hadoop並測試MapReduce
首先我們需要在網上下載Linux系統,我在這次安裝過程中使用的CentOS-7系統 CentOS-7映象檔案下載地址: https://www.centos.org/download/ 1. 選擇好自己需要的版本 2. 安裝CentOS系統,並安裝
MapReduce實現分組求TopN
本文以訂單案例為例,演示如何進行分組,以及求取每組的前兩條資料。 一:案例需求 有如下訂單資料 訂單id 商品id 成交金額 Order_0000001 Pdt_01 222.
win7 系統eclipse環境下測試 執行hadoop 的 wordcount mapreduce。
上篇介紹了在linux下測試執行 hadoop 的wordcount 例子後,就想著怎麼在eclipse 下編寫mapreduce函式,連結hadoop叢集計算呢。 linux下測試執行 hadoop 的wordcount 參考:https://mp.csdn.net/mdeditor/
Hadoop面試準備——MapReduce
1、MR執行流程 作業的提交 1)啟動客戶端Client,執行Job; 2)客戶端向資源管理器(ResourceManager)提交任務,請求一個新的ID號; 3)客戶端將Job所需的資源傳送給HDFS; 4)客戶端向RM提交作業; 作業的初始化 5)RM將作業請求傳送給
hadoop streaming欄位排序介紹
我們在使用hadoop streaming的時候預設streaming的map和reduce的separator不指定的話,map和reduce會根據它們預設的分隔符來進行排序 map:預設的分隔符是\t reduce:預設的分隔符是" " 得到的結果都是按第一個分隔符排序去重後的結果
大資料-Hadoop生態(13)-MapReduce框架原理--Job提交原始碼和切片原始碼解析
1.MapReduce的資料流 1) Input -> Mapper階段 輸入源是一個檔案,經過InputFormat之後,到了Mapper就成了K,V對,以上一章的流量案例來說,經過InputFormat之後,變成了手機號為key,這一行資料為value的K,V對,所以這裡我們可以自定義Inp