從hdfs批量匯出資料到hbase表中
阿新 • • 發佈:2019-02-05
將hdfs中的日誌資料匯入到hbase中。
打成jar包在伺服器使用
hadoop jar xxx.jar 包名.類名
執行
需要將hbase類庫加到hadoop的classpath中,在hadoop-env.sh檔案中新增hbase類庫
export HADOOP_CLASSPATH=/usr/local/hbase/lib/* 即可
package hbase.test; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class HbaseImport { //讀取hdfs中的資料來源,解析併產生rowkey static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t");//拆分每一行日誌為陣列 //將第一列的時間戳轉換格式 String dateStr = format.format(new Date(Long.parseLong(split[0]))); //將電話號碼和日期拼在一起作為匯入到hbase的rowkey String rowKey = split[1] + ":" + dateStr; //將rowkey和原來的每一行內容作為新產生行內容 Text v2 = new Text(); v2.set(rowKey + "\t" + line); //還將原來的key作為和新產生的行輸出到reduce,也就是在原來每一行日誌前面加上了自己生成的rowkey,其他沒變 context.write(key, v2); } } //將map端傳過來的資料匯入hbase static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{ public static final String COLUMN_FAMILY = "cf"; public static final String COLUMN_NAME_RAW = "raw"; public static final String COLUMN_NAME_REPORTTIME = "reportTime"; public static final String COLUMN_NAME_MSISDN = "msisdn"; public static final String COLUMN_NAME_APMAC = "apmac"; public static final String COLUMN_NAME_ACMAC = "acmac"; public static final String COLUMN_NAME_HOST = "host"; public static final String COLUMN_NAME_SITETYPE = "siteType"; public static final String COLUMN_NAME_UPPACKNUM = "upPackNum"; public static final String COLUMN_NAME_DOWNPACKNUM = "downPackNum"; public static final String COLUMN_NAME_UPPAYLOAD = "upPayLoad"; public static final String COLUMN_NAME_DOWNPAYLOAD = "downPayLoad"; public static final String COLUMN_NAME_HTTPSTATUS = "httpStatus"; @Override protected void reduce(LongWritable k2, Iterable<Text> v2s, TableReducer<LongWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { for (Text v2 : v2s) { String[] split = v2.toString().split("\t"); String rowKey = split[0];//此時陣列第一位元素是rowkey Put put = new Put(rowKey.getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_RAW.getBytes(), v2.toString().getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_REPORTTIME.getBytes(), split[1].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_MSISDN.getBytes(), split[2].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_APMAC.getBytes(), split[3].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_ACMAC.getBytes(), split[4].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HOST.getBytes(), split[5].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_SITETYPE.getBytes(), split[6].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPACKNUM.getBytes(), split[7].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPACKNUM.getBytes(), split[8].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPAYLOAD.getBytes(), split[9].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPAYLOAD.getBytes(), split[10].getBytes()); put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HTTPSTATUS.getBytes(), split[11].getBytes()); context.write(NullWritable.get(), put); } } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://hadoop4:9000/hbase"); conf.set("hbase.zookeeper.quorum", "hadoop4"); conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");//輸出到hbase的表名 conf.set("dfs.socket.tomeout", "180000"); Job job = new Job(conf,HbaseImport.class.getSimpleName()); //當打成jar包時,必須有以下兩行程式碼 TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HbaseImport.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); job.setMapperClass(BatchImportMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(BatchImportReducer.class); FileInputFormat.setInputPaths(job, "hdfs://hadoop4:9000/data/wlan"); job.waitForCompletion(true); } }