1. 程式人生 > >MapReduce常見演算法 與自定義排序及Hadoop序列化

MapReduce常見演算法 與自定義排序及Hadoop序列化

MapReduce常見演算法
•單詞計數
•資料去重
•排序
•Top K
•選擇  以求最值為例,從100萬資料中選出一行最小值
•投影  以求處理手機上網日誌為例,從其11個欄位選出了五個欄位(列)來顯示我們的手機上網流量
•分組  相當於分割槽,以求處理手機上網日誌為例,喊手機號和非手機號分為兩組
•多表連線
•單表關聯

使用TopK演算法找出檔案中的最大數:

複製程式碼
1 package suanfa;
2
3 import java.io.IOException;
4 import java.net.URI;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.fs.FileSystem;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.NullWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17
18 /**
19 * 找出檔案中的最大數
20 *
21 * @author ahu_lichang
22 *
23 */
24 public class TopKApp {
25 static final String INPUT_PATH = “hdfs://chaoren:9000/input”;
26 static final String OUT_PATH = “hdfs://chaoren:9000/out”;
27
28 public static void main(String[] args) throws Exception {
29 Configuration conf = new Configuration();
30 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
31 Path outPath = new Path(OUT_PATH);
32 if (fileSystem.exists(outPath)) {
33 fileSystem.delete(outPath, true);
34 }
35 Job job = new Job(conf, TopKApp.class.getSimpleName());
36 FileInputFormat.setInputPaths(job, INPUT_PATH);
37 job.setMapperClass(MyMapper.class);
38 job.setReducerClass(MyReducer.class);
39 job.setOutputKeyClass(LongWritable.class);
40 job.setOutputValueClass(NullWritable.class);
41 FileOutputFormat.setOutputPath(job, outPath);
42 job.waitForCompletion(true);
43 }
44
45 static class MyMapper extends
46 Mapper<LongWritable, Text, LongWritable, NullWritable> {
47 long max = Long.MIN_VALUE;
48
49 protected void map(LongWritable k1, Text v1, Context context)
50 throws java.io.IOException, InterruptedException {
51 long temp = Long.parseLong(v1.toString());
52 if (temp > max) {
53 max = temp;
54 }
55 }
56
57 @Override
58 protected void cleanup(
59 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)
60 throws IOException, InterruptedException {
61 context.write(new LongWritable(max), NullWritable.get());
62 }
63 }
64
65 static class MyReducer extends
66 Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
67 long max = Long.MIN_VALUE;
68
69 protected void reduce(
70 LongWritable k2,
71 Iterable v2s,
72 org.apache.hadoop.mapreduce.Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
73 throws IOException, InterruptedException {
74 long temp = k2.get();
75 if (temp > max) {
76 max = temp;
77 }
78 }
79
80 @Override
81 protected void cleanup(
82 org.apache.hadoop.mapreduce.Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
83 throws IOException, InterruptedException {
84 context.write(new LongWritable(max), NullWritable.get());
85 }
86 }
87 }

複製程式碼
遇見一個問題:在刪除HDFS中的檔案的時候,說檔案時安全模式下,無法刪除?

這時候要想刪除該檔案,必須退出安全模式,Hadoop退出安全模式的命令是:hadoop dfsadmin -safemode leave

自定義排序

將兩列資料進行排序,第一列按照升序排列,當第一列相同時,第二列升序排列。

在map和reduce階段進行排序時,比較的是k2。v2是不參與排序比較的。如果要想讓v2也進行排序,需要把k2和v2組裝成新的類,作為k2,才能參與比較。

複製程式碼
1 package sort;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6 import java.net.URI;
7
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.io.WritableComparable;
14 import org.apache.hadoop.mapreduce.Job;
15 import org.apache.hadoop.mapreduce.Mapper;
16 import org.apache.hadoop.mapreduce.Reducer;
17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
22
23 public class SortApp {
24 static final String INPUT_PATH = “hdfs://chaoren:9000/input”;
25 static final String OUT_PATH = “hdfs://chaoren:9000/out”;
26
27 public static void main(String[] args) throws Exception {
28 final Configuration configuration = new Configuration();
29
30 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),
31 configuration);
32 if (fileSystem.exists(new Path(OUT_PATH))) {
33 fileSystem.delete(new Path(OUT_PATH), true);
34 }
35
36 final Job job = new Job(configuration, SortApp.class.getSimpleName());
37
38 // 1.1 指定輸入檔案路徑
39 FileInputFormat.setInputPaths(job, INPUT_PATH);
40 // 指定哪個類用來格式化輸入檔案
41 job.setInputFormatClass(TextInputFormat.class);
42
43 // 1.2指定自定義的Mapper類
44 job.setMapperClass(MyMapper.class);
45 // 指定輸出<k2,v2>的型別
46 job.setMapOutputKeyClass(NewK2.class);
47 job.setMapOutputValueClass(LongWritable.class);
48
49 // 1.3 指定分割槽類
50 job.setPartitionerClass(HashPartitioner.class);
51 job.setNumReduceTasks(1);
52
53 // 1.4 TODO 排序、分割槽
54
55 // 1.5 TODO (可選)合併
56
57 // 2.2 指定自定義的reduce類
58 job.setReducerClass(MyReducer.class);
59 // 指定輸出<k3,v3>的型別
60 job.setOutputKeyClass(LongWritable.class);
61 job.setOutputValueClass(LongWritable.class);
62
63 // 2.3 指定輸出到哪裡
64 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
65 // 設定輸出檔案的格式化類
66 job.setOutputFormatClass(TextOutputFormat.class);
67
68 // 把程式碼提交給JobTracker執行
69 job.waitForCompletion(true);
70 }
71
72 static class MyMapper extends
73 Mapper<LongWritable, Text, NewK2, LongWritable> {
74 protected void map(
75 LongWritable key,
76 Text value,
77 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context)
78 throws java.io.IOException, InterruptedException {
79 final String[] splited = value.toString().split("\t");
80 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]),
81 Long.parseLong(splited[1]));
82 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
83 context.write(k2, v2);
84 };
85 }
86
87 static class MyReducer extends
88 Reducer<NewK2, LongWritable, LongWritable, LongWritable> {
89 protected void reduce(
90 NewK2 k2,
91 java.lang.Iterable v2s,
92 org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context)
93 throws java.io.IOException, InterruptedException {
94 context.write(new LongWritable(k2.first), new LongWritable(
95 k2.second));
96 };
97 }
98
99 /**
100 * 問:為什麼實現該類? 答:因為原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,作為新的k2
101 *
102 /
103 // WritableComparable:Hadoop的序列化
104 static class NewK2 implements WritableComparable {
105 Long first;
106 Long second;
107
108 public NewK2() {
109 }
110
111 public NewK2(long first, long second) {
112 this.first = first;
113 this.second = second;
114 }
115
116 public void readFields(DataInput in) throws IOException {
117 this.first = in.readLong();
118 this.second = in.readLong();
119 }
120
121 public void write(DataOutput out) throws IOException {
122 out.writeLong(first);
123 out.writeLong(second);
124 }
125
126 /

*
127 * 當k2進行排序時,會呼叫該方法. 當第一列不同時,升序;當第一列相同時,第二列升序
128 */
129 public int compareTo(NewK2 o) {
130 final long minus = this.first - o.first;
131 if (minus != 0) {
132 return (int) minus;
133 }
134 return (int) (this.second - o.second);
135 }
136
137 @Override
138 public int hashCode() {
139 return this.first.hashCode() + this.second.hashCode();
140 }
141
142 @Override
143 public boolean equals(Object obj) {
144 if (!(obj instanceof NewK2)) {
145 return false;
146 }
147 NewK2 oK2 = (NewK2) obj;
148 return (this.first == oK2.first) && (this.second == oK2.second);
149 }
150 }
151
152 }

複製程式碼
在這裡插入圖片描述

Hadoop序列化

序列化概念:

序列化:把結構化物件轉化為位元組流。

反序列化:是序列化的逆過程。即把位元組流轉回結構化物件。

Hadoop序列化的特點:

1、緊湊:高效使用儲存空間。

2、快速:讀寫資料的額外開銷小。

3、可擴充套件:可透明的讀取老格式的資料。

4、互操作:支援多語言的互動。

Hadoop的序列化格式:Writable

Hadoop序列化的作用:

序列化在分散式環境的兩大作用:程序間通訊,永久儲存。

Hadoop節點間通訊:
在這裡插入圖片描述

Writable介面

Writable介面,是根據DataInput和DataOutput實現的簡單、有效的序列化物件。

MR的任意key和value必須實現Writable介面。

MR的任意key必須實現WritableComparable介面。

自定義Writable類(上面程式碼中有)

實現Writable:

1、write是把每個物件序列化到輸出流。

2、readFields是把輸入流位元組反序列化。

實現WritableComparable:

Java值物件的比較:一般需要重寫toString(),hashCode(),equals()方法。