1. 程式人生 > >Hadoop閱讀筆記(二)——利用MapReduce求平均數和去重

Hadoop閱讀筆記(二)——利用MapReduce求平均數和去重

前言: 聖誕節來了,我怎麼能虛度光陰呢?!依稀記得,那一年,大家互贈賀卡,短短几行字,字字融化在心裡;那一年,大家在水果市場,尋找那些最能代表自己心意的蘋果香蕉梨,摸著冰冷的水果外皮,內心早已滾燙。這一年……我在部落格園-_-#,希望用dt的程式碼燃燒腦細胞,溫暖小心窩。

上篇 《Hadoop閱讀筆記(一)——強大的MapReduce》 主要介紹了MapReduce的在大資料集上處理的優勢以及執行機制,通過專利資料編寫Demo加深了對於MapReduce中輸入輸出資料結構的細節理解。有了理論上的指導,仍需手捧我的hadoop聖經——Hadoop實戰2,繼續走完未走過的路(有種怪怪的感覺,我一直在原地踏步)。

正文: 實踐是檢驗真理的唯一標準,不知道這是誰說的,但是作為碼農,倒也是實用受用的座右銘。除卻大牛們能夠在閱讀高深理論時new一個併發執行緒,眼睛所到之處,已然可以理清program的精髓所在,好似一臺計算機掃描了所看到的程式碼一般(有點誇張^_^)。作為普羅大眾的搬磚者來說,還是通過一些例項來加深對於理論的認識。

今天主要是通過以下兩個例子:求平均成績、去重來加深對MapReduce的理解。

1.如何用MapReduce求平均成績——WordCount的加強版

在談平均成績之前我們回顧下屬性的Hadoop HelloWorld程式——WordCount,其主要是統計資料集中各個單詞出現的次數。因為次數沒有多少之分,如果將這裡的次數換成分數就將字數統計問題轉化成了求每個個體的總成績的問題,再加上一步(總成績/學科數)運算就是這裡要討論的求平均數的問題了。在筆者看來,MapReduce是一種程式設計思維,它引導碼農們如何將一個亟待解決的問題轉換為一個MapReduce過程:map階段輸入什麼、map過程執行什麼、map階段輸出什麼、reduce階段輸入什麼、執行什麼、輸出什麼。能夠將以上幾個點弄清楚整明白,一個MapReduce程式就會躍然紙上。這裡:

Map:      指定格式的資料集(如"張三 60")——輸入資料

執行每條記錄的分割操作以key-value寫入上下文context中——執行功能

得到指定鍵值對型別的輸出(如"(new Text(張三),new IntWritable(60))")——輸出結果

Reduce:   map的輸出——輸入資料

求出單個個體的總成績後再除以該個體課程數目——執行功能

得到指定鍵值對型別的輸入——輸出結果

鑑於上面的map和reduce過程,我們可以得到如下的程式碼:

 1 public class Test1214 { 2 
 3     public static class MapperClass
extends Mapper<LongWritable, Text, Text, IntWritable> {
4 public void map(LongWritable key, Text value, Context context){ 5 String line = value.toString(); 6 System.out.println("該行資料為:" + line); 7 StringTokenizer token = new StringTokenizer(line,"\t"); 8 String nameT = token.nextToken(); 9 int score = Integer.parseInt(token.nextToken()); 10 Text name = new Text(nameT); 11 try { 12 context.write(name, new IntWritable(score)); 13 } catch (IOException e) { 14 e.printStackTrace(); 15 } catch (InterruptedException e) { 16 e.printStackTrace(); 17 } 18 } 19 } 20 21 public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable>{22 public void reduce(Text key, Iterable<IntWritable> value, Context context){ 23 int sum = 0; 24 int count =0; 25 for(IntWritable score : value){ 26 sum += score.get(); 27 count++; 28 System.out.println("第" + count + "個數值為:" + score.get()); 29 } 30 IntWritable avg = new IntWritable(sum/count); 31 try { 32 context.write(key, avg); 33 } catch (IOException e) { 34 e.printStackTrace(); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } 38 } 39 } 40 /** 41 * @param args 42 * @throws IOException 43 * @throws ClassNotFoundException 44 * @throws InterruptedException 45 */ 46 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 47 48 Configuration conf = new Configuration(); 49 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 50 if (otherArgs.length != 2) { 51 System.err.println("Usage: wordcount <in> <out>"); 52 System.exit(2); 53 } 54 Job job = new Job(conf, "Test1214"); 55 56 job.setJarByClass(Test1214.class); 57 job.setMapperClass(MapperClass.class); 58 job.setCombinerClass(ReducerClass.class); 59 job.setReducerClass(ReducerClass.class); 60 job.setOutputKeyClass(Text.class); 61 job.setOutputValueClass(IntWritable.class); 62 63 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 64 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 65 System.exit(job.waitForCompletion(true) ? 0 : 1); 66 System.out.println("end"); 67 } 68 69 }

資料集:這裡的資料是碼農我自己手工建立的,主要是想看看mapreduce的執行過程,所以就建立了兩個檔案,當然這裡面的成績也就沒有什麼是否符合正態分佈的考慮了……

資料中設定有A-K共11個學生,共16門課程,具體資料如下:

NameScore1.txt:

A	55
B	65
C	44
D	87
E	66
F	90
G	70
H	59
I	61
J	58
K	40
A	45
B	62
C	64
D	77
E	36
F	50
G	80
H	69
I	71
J	70
K	49
A	51
B	64
C	74
D	37
E	76
F	80
G	50
H	51
I	81
J	68
K	80
A	85
B	55
C	49
D	67
E	69
F	50
G	80
H	79
I	81
J	68
K	80
A	35
B	55
C	40
D	47
E	60
F	72
G	76
H	79
I	68
J	78
K	50
A	65
B	45
C	74
D	57
E	56
F	50
G	60
H	59
I	61
J	58
K	60
A	85
B	45
C	74
D	67
E	86
F	70
G	50
H	79
I	81
J	78
K	60
A	50
B	69
C	40
D	89
E	69
F	95
G	75
H	59
I	60
J	59
K	45

NameScore2.txt:

A	65
B	75
C	64
D	67
E	86
F	70
G	90
H	79
I	81
J	78
K	60
A	65
B	82
C	84
D	97
E	66
F	70
G	80
H	89
I	91
J	90
K	69
A	71
B	84
C	94
D	67
E	96
F	80
G	70
H	71
I	81
J	98
K	80
A	85
B	75
C	69
D	87
E	89
F	80
G	70
H	99
I	81
J	88
K	60
A	65
B	75
C	60
D	67
E	80
F	92
G	76
H	79
I	68
J	78
K	70
A	85
B	85
C	74
D	87
E	76
F	60
G	60
H	79
I	81
J	78
K	80
A	85
B	65
C	74
D	67
E	86
F	70
G	70
H	79
I	81
J	78
K	60
A	70
B	69
C	60
D	89
E	69
F	95
G	75
H	59
I	60
J	79
K	65

其執行過程中控制檯列印的資訊為:

14/12/14 17:05:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/14 17:05:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.JobClient: Running job: job_local_0001
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680
該行資料為:A 55
該行資料為:B 65
該行資料為:C 44
該行資料為:D 87
該行資料為:E 66
該行資料為:F 90
該行資料為:G 70
該行資料為:H 59
該行資料為:I 61
該行資料為:J 58
該行資料為:K 40
該行資料為:A 45
該行資料為:B 62
該行資料為:C 64
該行資料為:D 77
該行資料為:E 36
該行資料為:F 50
該行資料為:G 80
該行資料為:H 69
該行資料為:I 71
該行資料為:J 70
該行資料為:K 49
該行資料為:A 51
該行資料為:B 64
該行資料為:C 74
該行資料為:D 37
該行資料為:E 76
該行資料為:F 80
該行資料為:G 50
該行資料為:H 51
該行資料為:I 81
該行資料為:J 68
該行資料為:K 80
該行資料為:A 85
該行資料為:B 55
該行資料為:C 49
該行資料為:D 67
該行資料為:E 69
該行資料為:F 50
該行資料為:G 80
該行資料為:H 79
該行資料為:I 81
該行資料為:J 68
該行資料為:K 80
該行資料為:A 35
該行資料為:B 55
該行資料為:C 40
該行資料為:D 47
該行資料為:E 60
該行資料為:F 72
該行資料為:G 76
該行資料為:H 79
該行資料為:I 68
該行資料為:J 78
該行資料為:K 50
該行資料為:A 65
該行資料為:B 45
該行資料為:C 74
該行資料為:D 57
該行資料為:E 56
該行資料為:F 50
該行資料為:G 60
該行資料為:H 59
該行資料為:I 61
該行資料為:J 58
該行資料為:K 60
該行資料為:A 85
該行資料為:B 45
該行資料為:C 74
該行資料為:D 67
該行資料為:E 86
該行資料為:F 70
該行資料為:G 50
該行資料為:H 79
該行資料為:I 81
該行資料為:J 78
該行資料為:K 60
該行資料為:A 50
該行資料為:B 69
該行資料為:C 40
該行資料為:D 89
該行資料為:E 69
該行資料為:F 95
該行資料為:G 75
該行資料為:H 59
該行資料為:I 60
該行資料為:J 59
該行資料為:K 45
14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output1個數值為:552個數值為:453個數值為:514個數值為:855個數值為:356個數值為:657個數值為:858個數值為:501個數值為:452個數值為:643個數值為:654個數值為:455個數值為:556個數值為:697個數值為:628個數值為:551個數值為:642個數值為:493個數值為:444個數值為:745個數值為:746個數值為:407個數值為:408個數值為:741個數值為:672個數值為:673個數值為:774個數值為:375個數值為:876個數值為:577個數值為:898個數值為:471個數值為:362個數值為:663個數值為:764個數值為:865個數值為:696個數值為:697個數值為:608個數值為:561個數值為:902個數值為:953個數值為:704個數值為:505個數值為:806個數值為:507個數值為:508個數值為:721個數值為:602個數值為:763個數值為:504個數值為:505個數值為:806個數值為:707個數值為:758個數值為:801個數值為:592個數值為:693個數值為:514個數值為:795個數值為:596個數值為:797個數值為:598個數值為:791個數值為:602個數值為:613個數值為:814個數值為:815個數值為:616個數值為:717個數值為:688個數值為:811個數值為:582個數值為:593個數值為:784個數值為:685個數值為:786個數值為:687個數值為:708個數值為:581個數值為:402個數值為:503個數值為:494個數值為:605個數值為:606個數值為:457個數值為:808個數值為:80
14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
14/12/14 17:05:28 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680
該行資料為:A 65
該行資料為:B 75
該行資料為:C 64
該行資料為:D 67
該行資料為:E 86
該行資料為:F 70
該行資料為:G 90
該行資料為:H 79
該行資料為:I 81
該行資料為:J 78
該行資料為:K 60
該行資料為:A 65
該行資料為:B 82
該行資料為:C 84
該行資料為:D 97
該行資料為:E 66
該行資料為:F 70
該行資料為:G 80
該行資料為:H 89
該行資料為:I 91
該行資料為:J 90
該行資料為:K 69
該行資料為:A 71
該行資料為:B 84
該行資料為:C 94
該行資料為:D 67
該行資料為:E 96
該行資料為:F 80
該行資料為:G 70
該行資料為:H 71
該行資料為:I 81
該行資料為:J 98
該行資料為:K 80
該行資料為:A 85
該行資料為:B 75
該行資料為:C 69
該行資料為:D 87
該行資料為:E 89
該行資料為:F 80
該行資料為:G 70
該行資料為:H 99
該行資料為:I 81
該行資料為:J 88
該行資料為:K 60
該行資料為:A 65
該行資料為:B 75
該行資料為:C 60
該行資料為:D 67
該行資料為:E 80
該行資料為:F 92
該行資料為:G 76
該行資料為:H 79
該行資料為:I 68
該行資料為:J 78
該行資料為:K 70
該行資料為:A 85
該行資料為:B 85
該行資料為:C 74
該行資料為:D 87
該行資料為:E 76
該行資料為:F 60
該行資料為:G 60
該行資料為:H 79
該行資料為:I 81
該行資料為:J 78
該行資料為:K 80
該行資料為:A 85
該行資料為:B 65
該行資料為:C 74
該行資料為:D 67
該行資料為:E 86
該行資料為:F 70
該行資料為:G 70
該行資料為:H 79
該行資料為:I 81
該行資料為:J 78
該行資料為:K 60
該行資料為:A 70
該行資料為:B 69
該行資料為:C 60
該行資料為:D 89
該行資料為:E 69
該行資料為:F 95
該行資料為:G 75
該行資料為:H 59
該行資料為:I 60
該行資料為:J 79
該行資料為:K 65
14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output1個數值為:652個數值為:653個數值為:714個數值為:855個數值為:656個數值為:857個數值為:858個數值為:701個數值為:652個數值為:843個數值為:754個數值為:855個數值為:756個數值為:697個數值為:828個數值為:751個數值為:842個數值為:693個數值為:644個數值為:745個數值為:946個數值為:607個數值為:608個數值為:741個數值為:672個數值為:873個數值為:974個數值為:675個數值為:676個數值為:877個數值為:898個數值為:671個數值為:662個數值為:863個數值為:964個數值為:865個數值為:896個數值為:697個數值為:808個數值為:761個數值為:702個數值為:953個數值為:704個數值為:705個數值為:806個數值為:607個數值為:808個數值為:921個數值為:602個數值為:763個數值為:704個數值為:705個數值為:806個數值為:907個數值為:758個數值為:701個數值為:792個數值為:893個數值為:714個數值為:995個數值為:596個數值為:797個數值為:798個數值為:791個數值為:602個數值為:813個數值為:814個數值為:815個數值為:816個數值為:917個數值為:688個數值為:811個數值為:782個數值為:793個數值為:784個數值為:885個數值為:786個數值為:987個數值為:908個數值為:781個數值為:602個數值為:703個數值為:694個數值為:605個數值為:806個數值為:657個數值為:608個數值為:80
14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.Merger: Merging 2 sorted segments
14/12/14 17:05:28 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
第1個數值為:582個數值為:731個數值為:762個數值為:571個數值為:572個數值為:721個數值為:782個數值為:661個數值為:642個數值為:811個數值為:772個數值為:691個數值為:672個數值為:731個數值為:792個數值為:661個數值為:702個數值為:781個數值為:832個數值為:671個數值為:582個數值為:68
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/14 17:05:28 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/user/hadoop/output4
14/12/14 17:05:28 INFO mapred.LocalJobRunner: reduce > reduce
14/12/14 17:05:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
14/12/14 17:05:28 INFO mapred.JobClient: map 100% reduce 100%
14/12/14 17:05:28 INFO mapred.JobClient: Job complete: job_local_0001
14/12/14 17:05:28 INFO mapred.JobClient: Counters: 14
14/12/14 17:05:28 INFO mapred.JobClient: FileSystemCounters
14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_READ=50573
14/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_READ=2630
14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_WRITTEN=103046
14/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=55
14/12/14 17:05:28 INFO mapred.JobClient: Map-Reduce Framework
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input groups=11
14/12/14 17:05:28 INFO mapred.JobClient: Combine output records=22
14/12/14 17:05:28 INFO mapred.JobClient: Map input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/14 17:05:28 INFO mapred.JobClient: Reduce output records=11
14/12/14 17:05:28 INFO mapred.JobClient: Spilled Records=44
14/12/14 17:05:28 INFO mapred.JobClient: Map output bytes=1056
14/12/14 17:05:28 INFO mapred.JobClient: Combine input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Map output records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input records=22

最終結果為:
A 65
B 66
C 64
D 72
E 72
F 73
G 70
H 72
I 74
J 75
K 63

為了更清晰從將要執行的控制檯中看到map和reduce過程的執行都進行了那些操作,我們在其中列印了相關資訊,這裡有自己的兩點疑惑需要拿出來鬧鬧:

(1).這裡我寫的程式和書中不一樣,沒有增加StringTokenizer token = new StringTokenizer(line,"line")這行,事實上我加上去後會出現錯誤,我的理解是,因為預設格式是TextInputFormat即已經將檔案中的文字按照行標示進行分割,即輸入給map方法的已經是以一行為單位的記錄,所以這裡不需要以“\n”進行分割了。書中的做法應該是假定將整個檔案拿過來,統一處理,但事實上這裡預設的TextInputFormat已經完成了前期工作。(如果執迷不悟這樣處理的話,距離來說NameScore1.txt中第一行是“A     55”整個以“\n”分割後就是一個整體了,再以“\t”就無法分割了。)

(2).從執行過程列印的資訊,起初讓我有些疑惑,因為從資訊來看,似乎是:NameScore1.txt被分割並以每行記錄進入map過程,當執行到該檔案的最後一行記錄時,從列印資訊我們似乎看到的是緊接著就去執行reduce過程了,後面的NameScore2.txt也是如此,當兩個檔案都分別執行了map和reduce時似乎又執行了一次reduce操作。那麼事實是不是如此,如果真是這樣,這與先前所看到的理論中提到當map執行完後再執行reduce是否有衝突。

通過檢視程式碼我們發現

job.setMapperClass(MapperClass.class);

job.setCombinerClass(ReducerClass.class);

job.setReducerClass(ReducerClass.class);

是的,沒錯,在這裡我們發現了端倪,其真正執行過程是:先執行map,這就是過程資訊中列印了每條成績記錄,後面執行的reduce其實是介於map和reduce之間的combiner操作,那麼這個Combiner類又為何物,通過神奇的API我們可以發現Combine其實就是一次reduce過程,而且這裡明確寫了combiner和reduce過程都是使用ReducerClass類,從而就出現了這裡為什麼對於單個檔案都會先執行map然後在reduce,再後來是兩個map執行後,才執行的真正的reduce。

2.去重——閹割版的WordCount

相比於前面的求平均值例子需要新增一些邏輯程式碼來說,這裡的去重更像是閹割版的WordCount。

如果你還是用傳統的思維在考量一個去重的程式需要多少次的判斷,如果你還不瞭解什麼是真正的map和reduce。Hadoop中的去重問題被你整複雜了。要知道,當一個map執行完後會對執行的資料進行一個排序,比如按照字母先後順序;後面會進入combine階段,這階段主要是針對key-value中有相同的key就合併;再到reduce階段,通過迭代器遍歷前一階段合併的各個元素,得到最終的輸出結果。

對於去重來說,我們不在乎一個元素到底出現了幾次,只要知道這個元素確實出現了,並能夠再最後顯示出來就行了,通過map和combiner,我們最終得到的key-value對中的key都是不一樣的,也就是說在完成合並的同時就是我們所需要的去重操作(是不是有點繞)。最終reduce輸出的就是具有唯一性的去重的元素集合。我們還是按照理清map和reduce的思路來看待這個去重問題:

map: 資料中的一行記錄如"(安徽  jack)"——輸入資料

直接以key-value的方式寫入上下文物件context(這裡的value並不是我們關心的物件,可以為空)——執行功能

得到指定鍵值對型別的輸出如"(new Text(安徽),new Text(""))"——輸出結果

reduce: map的輸出——輸入資料

直接以key-value的方式寫入上下文物件context(同樣,value並不是我們關心的物件)——執行功能

得到指定鍵值對型別的輸入——輸出結果

鑑於以上對於map和reduce的理解,程式碼如下:

 1 package org.apache.mapreduce;
 2 
 3 import java.io.IOException;
 4 import java.util.Collection;
 5 import java.util.Iterator;
 6 import java.util.StringTokenizer;
 7 
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.IntWritable;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapred.TextInputFormat;
14 import org.apache.hadoop.mapreduce.Job;
15 import org.apache.hadoop.mapreduce.Mapper;
16 import org.apache.hadoop.mapreduce.Reducer;