MapReduce程式設計實現txt檔案中的內容匯入HBase
阿新 • • 發佈:2019-02-06
一、建立java專案。
寫入程式碼,如下: [java] view plain copy print?- package translate1;
- import java.io.IOException;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapreduce.*;
- import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.lib.input.*;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.io.*;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.util.Bytes;
-
public
- publicstatic Job createSubmittableJob(Configuration conf, String[] args)throws IOException {
- String tableName = args[0];
- Path inputDir = new Path(args[1]);
- @SuppressWarnings("deprecation")
-
Job job = new
- job.setJarByClass(HourlyImporter.class);
- FileInputFormat.setInputPaths(job, inputDir);
- job.setMapperClass(HourlyImporter.class);
- TableMapReduceUtil.initTableReducerJob(tableName, null, job);
- job.setNumReduceTasks(0);
- TableMapReduceUtil.addDependencyJars(job);
- return job;
- }
- publicstaticvoid main(String[] args)throws Exception {
- Configuration conf = HBaseConfiguration.create();
- Job job = createSubmittableJob(conf, args);
- System.exit (job.waitForCompletion(true) ? 0 : 1);
- }
- }
- class HourlyImporter extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
- privatelong ts;
- staticbyte[] family = Bytes.toBytes("n");
- @Override
- protectedvoid setup(Context context) {
- ts = System.currentTimeMillis();
- }
- publicstatic String change(String str,int n,boolean j){
- if(str==null||str.length()>=n) return str;
- String s="";
- for(int i=str.length();i<n;i++)
- s+="0";
- if(j) return s+str;
- elsereturn str+s;
- }
- @SuppressWarnings("deprecation")
- publicvoid map(LongWritable offset, Text value, Context context)throws IOException {
- try {
- String line = value.toString();
- String stationID = line.substring(0, 4);
- String month = line.substring(5, 7);
- String day = line.substring(7, 9);
- String rowkey = stationID + month + day;
- byte[] bRowKey = Bytes.toBytes(rowkey);
- ImmutableBytesWritable rowKey = new ImmutableBytesWritable(bRowKey);
- Put p = new Put(bRowKey);
- for (int i = 1; i < 4 ; i++) {
- String columnI ="v" + change(String.valueOf(i),2,true);
- int beginIndex = i * 2 + 8;
- String valueI =line.substring(beginIndex, beginIndex + 2).trim();
- p.add(family, Bytes.toBytes(columnI),ts, Bytes.toBytes(valueI));
- }
- context.write(rowKey, p);
- }catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }