MapReduce功能實現
阿新 • • 發佈:2018-12-16
MapReduce功能實現系列:
MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換
MapReduce功能實現四---小綜合(從hbase中讀取資料統計並在hdfs中降序輸出Top 3)
MapReduce功能實現五---去重(Distinct)、計數(Count)
MapReduce功能實現六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能實現七---小綜合(多個job序列處理計算平均值)
MapReduce功能實現八---分割槽(Partition)
MapReduce功能實現十---倒排索引(Inverted Index)
一、從Hbase表1中讀取資料再把統計結果存到表2
在Hbase中建立相應的表1:
-
create
'hello',
'cf'
-
put
'hello'
,
'1',
'cf:hui',
'hello world'
-
put
'hello',
'2',
'cf:hui',
'hello hadoop'
-
put
'hello',
'3',
'cf:hui',
'hello hive'
-
put
'hello'
,
'4',
'cf:hui',
'hello hadoop'
-
put
'hello',
'5',
'cf:hui',
'hello world'
-
put
'hello',
'6',
'cf:hui',
'hello world'
java程式碼:
-
import java.io.IOException;
-
import java.util.Iterator;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
-
import org.apache.hadoop.hbase.HTableDescriptor;
-
import org.apache.hadoop.hbase.client.HBaseAdmin;
-
import org.apache.hadoop.hbase.client.Put;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.mapreduce.TableReducer;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
-
public
class HBaseToHbase {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
String hbaseTableName1 =
"hello";
-
String hbaseTableName2 =
"mytb2";
-
-
prepareTB2(hbaseTableName2);
-
-
Configuration conf =
new Configuration();
-
-
Job job = Job.getInstance(conf);
-
job.setJarByClass(HBaseToHbase.class);
-
job.setJobName(
"mrreadwritehbase");
-
-
Scan scan =
new Scan();
-
scan.setCaching(
500);
-
scan.setCacheBlocks(
false);
-
-
TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);
-
TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);
-
System.exit(job.waitForCompletion(
true) ?
1 :
0);
-
}
-
-
public
static
class doMapper extends TableMapper<Text, IntWritable>{
-
private
final
static IntWritable one =
new IntWritable(
1);
-
@Override
-
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
String rowValue = Bytes.toString(value.list().get(
0).getValue());
-
context.write(
new Text(rowValue), one);
-
}
-
}
-
-
public
static
class doReducer extends TableReducer<Text, IntWritable, NullWritable>{
-
@Override
-
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
System.out.println(key.toString());
-
int sum =
0;
-
Iterator<IntWritable> haha = values.iterator();
-
while (haha.hasNext()) {
-
sum += haha.next().get();
-
}
-
Put put =
new Put(Bytes.toBytes(key.toString()));
-
put.add(Bytes.toBytes(
"mycolumnfamily"), Bytes.toBytes(
"count"), Bytes.toBytes(String.valueOf(sum)));
-
context.write(NullWritable.get(), put);
-
}
-
}
-
-
public static void prepareTB2(String hbaseTableName) throws IOException{
-
HTableDescriptor tableDesc =
new HTableDescriptor(hbaseTableName);
-
HColumnDescriptor columnDesc =
new HColumnDescriptor(
"mycolumnfamily");
-
tableDesc.addFamily(columnDesc);
-
Configuration cfg = HBaseConfiguration.create();
-
HBaseAdmin admin =
new HBaseAdmin(cfg);
-
if (admin.tableExists(hbaseTableName)) {
-
System.out.println(
"Table exists,trying drop and create!");
-
admin.disableTable(hbaseTableName);
-
admin.deleteTable(hbaseTableName);
-
admin.createTable(tableDesc);
-
}
else {
-
System.out.println(
"create table: "+ hbaseTableName);
-
admin.createTable(tableDesc);
-
}
-
}
-
}
在Linux中執行該程式碼:
-
[[email protected] q1]$ /usr/jdk1.
7.0_25/bin/javac HBaseToHbase.java
-
[[email protected] q1]$ /usr/jdk1.
7.0_25/bin/jar cvf xx.jar HBaseToHbase*
class
-
[[email protected] q1]$ hadoop jar xx.jar HBaseToHbase
檢視mytb2表:
-
hbase(main):
009:
0> scan
'mytb2'
-
ROW COLUMN+CELL
-
hello hadoop column=mycolumnfamily:count, timestamp=
1489817182454,
value=
2
-
hello hive column=mycolumnfamily:count, timestamp=
1489817182454,
value=
1
-
hello world column=mycolumnfamily:count, timestamp=
1489817182454,
value=
3
-
3 row(s)
in
0.0260 seconds
二、從Hbase表1中讀取資料再把結果存Hdfs中
1.將表1的內容不統計輸出:
-
import java.io.IOException;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.io.WritableComparable;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-
public
class HbaseToHdfs {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
-
String tablename =
"hello";
-
Configuration conf = HBaseConfiguration.create();
-
conf.set(
"hbase.zookeeper.quorum",
"h71");
-
Job job =
new Job(conf,
"WordCountHbaseReader");
-
job.setJarByClass(HbaseToHdfs.class);
-
Scan scan =
new Scan();
-
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, Text.class, job);
-
job.setReducerClass(WordCountHbaseReaderReduce.class);
-
FileOutputFormat.setOutputPath(job,
new Path(args[
0]));
-
MultipleOutputs.addNamedOutput(job,
"hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
-
System.exit(job.waitForCompletion(
true) ?
0 :
1);
-
}
-
-
public
static
class doMapper extends TableMapper<Text, Text>{
-
@Override
-
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
String rowValue = Bytes.toString(value.list().get(
0).getValue());
-
context.write(
new Text(rowValue),
new Text(
"one"));
-
}
-
}
-
-
public
static
class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,NullWritable>{
-
private Text result =
new Text();
-
@Override
-
protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
-
for(Text val:values){
-
result.set(val);
-
context.write(key, NullWritable.get());
-
}
-
}
-
}
-
}
在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseToHdfs.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseToHdfs*class
[[email protected] q1]$ hadoop jar xx.jar HbaseToHdfs /output
注意:/output目錄不能存在,如果存在就刪除掉
[[email protected] q1]$ hadoop fs -ls /output
Found 2 items
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 14:28 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 73 2017-03-18 14:28 /output/part-r-00000
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hello hadoop
hello hadoop
hello hive
hello world
hello world
hello world
2.將表1的內容統計輸出:
-
import java.io.IOException;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.io.WritableComparable;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-
public
class HbaseToHdfs1 {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
-
String tablename =
"hello";
-
Configuration conf = HBaseConfiguration.create();
-
conf.set(
"hbase.zookeeper.quorum",
"h71");
-
Job job =
new Job(conf,
"WordCountHbaseReader");
-
job.setJarByClass(HbaseToHdfs1.class);
-
Scan scan =
new Scan();
-
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
-
job.setReducerClass(WordCountHbaseReaderReduce.class);
-
FileOutputFormat.setOutputPath(job,
new Path(args[
0]));
-
MultipleOutputs.addNamedOutput(job,
"hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
-
System.exit(job.waitForCompletion(
true) ?
0 :
1);
-
}
-
-
public
static
class doMapper extends TableMapper<Text, IntWritable>{
-
private
final
static IntWritable one =
new IntWritable(
1);
-
private Text word =
new Text();
-
@Override
-
protected void map(Immutabl