HBase用一個MapReduce Job同時寫入兩張表
阿新 • • 發佈:2019-02-07
原始資料如下:
fansy,22,blog.csdu.net/fansy1990
tom,25,blog.csdu.net/tom1987
kate,23,blog.csdu.net/kate1989
jake,20,blog.csdu.net/jake1992
john,35,blog.csdu.net/john1977
ben,30,blog.csdu.net/ben1982
第一列代表name,dierlie代表age,disanlie代表webPage;要做的事情是把name和age存入表1,name和webPage存入表2;下面貼程式碼:ImportToHB.java:
package org.fansy.multipletables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * write to multiple tables * @author fansy * */ public class ImportToHB extends Configured implements Tool{ public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new ImportToHB(), args); System.exit(exitCode); } @Override public int run(String[] args) throws Exception { if(args.length!=7){ System.err.println("wrong args length:"+args.length); // System.out.println(); System.out.println("Usage: <input> <table1> <table1-fam> <table1-qua> "+ "<table2> <table2-fam> <table2-qua>"); System.exit(-1); } Configuration conf=new Configuration(); conf.set("TABLE1", args[1]); conf.set("T1-FAM", args[2]); conf.set("T1-QUA", args[3]); conf.set("TABLE2", args[4]); conf.set("T2-FAM", args[5]); conf.set("T2-QUA", args[6]); Job job = new Job(conf); job.setJarByClass(ImportToHB.class); job.setMapperClass(MapperHB.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Writable.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.setOutputFormatClass(MultiTableOutputFormat.class); job.setNumReduceTasks(0); if(job.waitForCompletion(true)){ return 0; } return -1; } }
MapperHB.java:
package org.fansy.multipletables; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Mapper; public class MapperHB extends Mapper<LongWritable,Text,ImmutableBytesWritable,Writable>{ private byte[] table1; private byte[] table2; private byte[] t1_fam; private byte[] t1_qua; private byte[] t2_fam; private byte[] t2_qua; public void setup(Context context){ table1=Bytes.toBytes(context.getConfiguration().get("TABLE1")); table2=Bytes.toBytes(context.getConfiguration().get("TABLE2")); t1_fam=Bytes.toBytes(context.getConfiguration().get("T1-FAM")); t1_qua=Bytes.toBytes(context.getConfiguration().get("T1-QUA")); t2_fam=Bytes.toBytes(context.getConfiguration().get("T2-FAM")); t2_qua=Bytes.toBytes(context.getConfiguration().get("T2-QUA")); } public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] info=value.toString().split(","); if(info.length!=3){ return; } String name=info[0]; String age=info[1]; String webPage=info[2]; // write to the first table row = name+age, value=age; ImmutableBytesWritable putTable = new ImmutableBytesWritable(table1); Put put = new Put(Bytes.toBytes(name+","+age)); put.add(t1_fam,t1_qua,Bytes.toBytes(age)); context.write(putTable, put); // write to the second table row=name+webPage,value=webPage putTable = new ImmutableBytesWritable(table2); put = new Put(Bytes.toBytes(name+","+webPage)); put.add(t2_fam,t2_qua,Bytes.toBytes(webPage)); context.write(putTable, put); } }
上面的程式碼只用了一個Mapper,同時寫入兩個HBase表中。這裡的要點是設定Mapper的輸出key和value的型別,按照上面的程式碼型別為:ImmutableBytesWritable和Writable,而且在job的宣告處要設定輸出型別:job.setOutputFormatClass(MultiTableOutputFormat.class);
如何執行上面的程式?
(1)在HBase中建立兩張表: create 'table1','info' create 'table2','info' (2)ImportToHB的輸入引數如下: hdfs://master:9000/user/fansy/input/info.dat table1 info age table2 info webPage (3)直接在eclipse中執行
執行後在HBase中察看輸出的資料如下:
分享,快樂,成長