MapReduce處理多個不同的出入檔案
阿新 • • 發佈:2019-01-10
MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式
現有兩份資料
phone
123,good number
124,common number
125,bad number
user
zhangsan,123
lisi,124
wangwu,125
現在需要把user和phone按照phone number連線起來。得到下面的結果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number
分析思路(不同檔案之間每行資料有相同的kay,在map階段相同key的不同value就形成了一個集合,在map階段對該集合裡的value進行組合進而得到想要的結果)
還是相當於兩張表的一對一join操作。join時對value設定個Bean(JavaBean實現writablecomparable介面),key為外來鍵值
join的優化,詳見http://blog.csdn.net/u010366796/article/details/44649933,設定KeyBean(外健和標識flag屬性),進行排序
本例中將通過value進行排序,即在value的JavaBean中通過實習CompareTo()方法,完成排序,使得phone表位於首位
1.對value實現JavaBean(實現writablecomparable介面)
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.DataInput;
04.
import
java.io.DataOutput;
05.
import
java.io.IOException;
06.
07.
import
org.apache.hadoop.io.WritableComparable;
08.
09.
/*
10.
* 自定義的JavaBean
11.
*/
12.
public
class
FlagString
implements
WritableComparable<FlagString> {
13.
private
String value;
14.
private
int
flag;
// 標記 0:表示phone表 1:表示user表
15.
16.
public
FlagString() {
17.
super
();
18.
// TODO Auto-generated constructor stub
19.
}
20.
21.
public
FlagString(String value,
int
flag) {
22.
super
();
23.
this
.value = value;
24.
this
.flag = flag;
25.
}
26.
27.
public
String getValue() {
28.
return
value;
29.
}
30.
31.
public
void
setValue(String value) {
32.
this
.value = value;
33.
}
34.
35.
public
int
getFlag() {
36.
return
flag;
37.
}
38.
39.
public
void
setFlag(
int
flag) {
40.
this
.flag = flag;
41.
}
42.
43.
@Override
44.
public
void
write(DataOutput out)
throws
IOException {
45.
out.writeInt(flag);
46.
out.writeUTF(value);
47.
48.
}
49.
50.
@Override
51.
public
void
readFields(DataInput in)
throws
IOException {
52.
this
.flag = in.readInt();
53.
this
.value = in.readUTF();
54.
}
55.
56.
@Override
57.
public
int
compareTo(FlagString o) {
58.
if
(
this
.flag >= o.getFlag()) {
59.
if
(
this
.flag > o.getFlag()) {
60.
return
1
;
61.
}
62.
}
else
{
63.
return
-
1
;
64.
}
65.
return
this
.value.compareTo(o.getValue());
66.
}
67.
68.
}
2.多map類,map1(實現對phone表文件操作)
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.IOException;
04.
05.
import
org.apache.hadoop.io.LongWritable;
06.
import
org.apache.hadoop.io.Text;
07.
import
org.apache.hadoop.mapreduce.Mapper;
08.
09.
public
class
MultiMap1
extends
Mapper<LongWritable, Text, Text, FlagString> {
10.
private
String delimiter;
// 定義分隔符,由job端設定
11.
12.
@Override
13.
protected
void
setup(
14.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
15.
throws
IOException, InterruptedException {
16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);
17.
}
18.
19.
@Override
20.
protected
void
map(LongWritable key, Text value,
21.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
22.
throws
IOException, InterruptedException {
23.
String line = value.toString().trim();
24.
if
(line.length() >
0
) {
25.
String[] str = line.split(delimiter);
26.
if
(str.length ==
2
) {
27.
context.write(
new
Text(str[
0
].trim()),
28.
new
FlagString(str[
1
].trim(),
0
));
// flag=0,表示phone表
29.
}
30.
}
31.
}
32.
}
2.map2(實現對user表文件操作)
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.IOException;
04.
05.
import
org.apache.hadoop.io.LongWritable;
06.
import
org.apache.hadoop.io.Text;
07.
import
org.apache.hadoop.mapreduce.Mapper;
08.
09.
public
class
MultiMap2
extends
Mapper<LongWritable, Text, Text, FlagString> {
10.
private
String delimiter;
// 設定分隔符
11.
12.
@Override
13.
protected
void
setup(
14.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
15.
throws
IOException, InterruptedException {
16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);
17.
}
18.
19.
@Override
20.
protected
void
map(LongWritable key, Text value,
21.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
22.
throws
IOException, InterruptedException {
23.
String line = value.toString().trim();
24.
if
(line.length() >
0
) {
25.
String[] str = line.split(delimiter);
26.
if
(str.length ==
2
) {
27.
context.write(
new
Text(str[
1
].trim()),
28.
new
FlagString(str[
0
].trim(),
1
));
// flag=1為user表
29.
}
30.
}
31.
}
32.
}
3.reduce類
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.IOException;
04.
05.
import
org.apache.hadoop.io.NullWritable;
06.
import
org.apache.hadoop.io.Text;
07.
import
org.apache.hadoop.mapreduce.Reducer;
08.
09.
public
class
MultiRedu
extends
Reducer<Text, FlagString, NullWritable, Text> {
10.
private
String delimiter;
// 設定分隔符
11.
12.
@Override
13.
protected
void
setup(
14.
Reducer<Text, FlagString, NullWritable, Text>.Context context)
15.
throws
IOException, InterruptedException {
16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);
17.
}
18.
19.
@Override
20.
protected
void
reduce(Text key, Iterable<FlagString> values,
21.
Reducer<Text, FlagString, NullWritable, Text>.Context context)
22.
throws
IOException, InterruptedException {
23.
// 最後輸出的格式為: uservalue,key,phonevalue
24.
String phoneValue =
""
;
25.
String userValue =
""
;
26.
int
num =
0
;
27.
for
(FlagString value : values) {
28.
// 第一個即為phone表
29.
if
(num ==
0
) {
30.
phoneValue = value.getValue();
31.
num++;
32.
}
else
{
33.
userValue = value.getValue();
34.
context.write(NullWritable.get(),
35.
new
Text(userValue + key.toString() + phoneValue));
36.
}
37.
}
38.
}
39.
}
4.job類(關鍵!!實現多檔案的輸入格式等)
001.
package
test.mr.multiinputs;
002.
003.
import
org.apache.hadoop.conf.Configuration;
004.
import
org.apache.hadoop.fs.Path;
005.
import
org.apache.hadoop.io.NullWritable;
006.
import
org.apache.hadoop.io.Text;
007.
import
org.apache.hadoop.mapreduce.Job;
008.
import
org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
009.
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
010.
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
011.
import
org.apache.hadoop.util.Tool;
012.
import
org.apache.hadoop.util.ToolRunner;
013.
014.
/*
015.
* MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式
016.
現有兩份資料
017.
phone
018.
123,good number
019.
124,common number
020.
123,bad number
021.
022.
user
023.
zhangsan,123
024.
lisi,124
025.
wangwu,125
026.
027.
現在需要把user和phone按照phone number連線起來。得到下面的結果
028.
zhangsan,123,good number
029.
lisi,123,common number
030.
wangwu,125,bad number
031.
*/
032.
public
class
MultiMapMain
extends
Configuration
implements
Tool {
033.
private
String input1 =
null
;
// 定義的多個輸入檔案
034.
private
String input2 =
null
;
035.
private
String output =
null
;
036.
private
String delimiter =
null
;
037.
038.
@Override
039.
public
void
setConf(Configuration conf) {
040.
041.
}
042.
043.
@Override
044.
public
Configuration getConf() {
045.
return
new
Configuration();
046.
}
047.
048.
@Override
049.
public
int
run(String[] args)
throws
Exception {
050.
setArgs(args);
051.
checkParam();
// 對引數進行檢測
052.
053.
Configuration conf =
new
Configuration();
054.
Job job =
new
Job(conf);
055.
job.setJarByClass(MultiMapMain.
class
);
056.
057.
job.setMapOutputKeyClass(Text.
class
);
058.
job.setMapOutputValueClass(FlagString.
class
);
059.
060.
job.setReducerClass(MultiRedu.
class
);
061.
job.setOutputKeyClass(NullWritable.
class
);
062.
job.setOutputValueClass(Text.
class
);
063.
064.
// MultipleInputs類新增檔案路徑
065.
MultipleInputs.addInputPath(job,
new
Path(input1),
066.
TextInputFormat.
class
, MultiMap1.
class
);
067.
MultipleInputs.addInputPath(job,
new
Path(input2),
068.
TextInputFormat.
class
, MultiMap2.
class
);
069.
070.
FileOutputFormat.setOutputPath(job,
new
Path(output));
071.
job.waitForCompletion(
true
);
072.
return
0
;
073.
}
074.
075.
private
void
checkParam() {
076.
if
(input1 ==
null
||
""
.equals(input1.trim())) {
077.
System.out.println(
"no input phone-data path"
);
078.
userMaunel();
079.
System.exit(-
1
);
080.
}
081.
if
(input2 ==
null
||
""
.equals(input2.trim())) {
082.
System.out.println(
"no input user-data path"
);
083.
userMaunel();
084.
System.exit(-
1
);
085.
}
086.
if
(output ==
null
||
""
.equals(output.trim())) {
087.
System.out.println(
"no output path"
);
088.
userMaunel();
089.
System.exit(-
1
);
090.
}
091.
if
(delimiter ==
null
||
""
.equals(delimiter.trim())) {
092.
System.out.println(
"no delimiter"
);
093.
userMaunel();
094.
System.exit(-
1
);
095.
}
096.
097.
}
098.
099.
// 使用者手冊
100.
private
void
userMaunel() {
101.
System.err.println(
"Usage:"
);
102.
System.err.println(
"-i1 input phone data path."
);
103.
System.err.println(
"-i2 input user data path."
);
104.
System.err.println(
"-o output output data path."
);
105.
System.err.println(
"-delimiter data delimiter default comma."
);
106.
}
107.
108.
// 對屬性進行賦值
109.
// 設定輸入的格式:-i1 xxx(輸入目錄) -i2 xxx(輸入目錄) -o xxx(輸出目錄) -delimiter x(分隔符)
110.
private
void
setArgs(String[] args) {
111.
for
(
int
i =
0
; i < args.length; i++) {
112.
if
(
"-i1"
.equals(args[i])) {
113.
input1 = args[++i];
// 將input1賦值為第一個檔案的輸入路徑
114.
}
else
if
(
"-i2"
.equals(args[i])) {
115.
input2 = args[++i];
116.
}
else
if
(
"-o"
.equals(args[i])) {
117.
output = args[++i];
118.
}
else
if
(
"-delimiter"
.equals(args[i])) {
119.
delimiter = args[++i];
120.
}
121.
}
122.
}
123.
124.
public
static
void
main(String[] args)
throws
Exception {
125.
Configuration conf =
new
Configuration();
126.
ToolRunner.run(conf,
new
MultiMapMain(), args);
// 呼叫run方法
127.
}
128.
}