hbase實戰之javaAPI插入資料
阿新 • • 發佈:2018-11-04
一,實現思路
1,先mapreduces得到並傳遞資料。
2,寫好連線表,建立表,插入表hbase資料庫的工具。
3,在reduces中呼叫寫好的hbase工具。
4,main類提交。
二,程式碼書寫
1,mapper
package com; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //傳遞資料 public class mapper extends Mapper<LongWritable, Text, Text, User>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, User>.Context context) throws IOException, InterruptedException { String data = value.toString(); String[] s = data.split(","); System.out.println(data); context.write(new Text("1"), new User(s[0],s[1],s[2],s[3],s[4])); } }
2,hbase工具類
package com; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; public class HbaseUtils { public static final String c="info"; //reducer呼叫的方法 public static void insertinfo(String ip,String port,String tableName,List<User> list) throws Exception{ Connection con=getConnection(ip,port); HBaseAdmin admin = (HBaseAdmin)con.getAdmin(); Table table = con.getTable(TableName.valueOf(tableName)); boolean b = admin.tableExists(TableName.valueOf(tableName)); if(!b){ createTable(admin,tableName); } insertList(table,list); } //插入資料的方法 private static void insertList(Table table, List<User> list) throws Exception { for (User user : list) { Put put = new Put(user.getId().getBytes()); put.addColumn(c.getBytes(), "name".getBytes(), user.getName().getBytes()); put.addColumn(c.getBytes(), "Age".getBytes(), user.getAge().getBytes()); put.addColumn(c.getBytes(), "Sex".getBytes(), user.getSex().getBytes()); put.addColumn(c.getBytes(), "Part".getBytes(), user.getPart().getBytes()); table.put(put); } } //建立表的方法 private static void createTable(Admin admin, String tableName) throws Exception { HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor descriptor2 = new HColumnDescriptor(c); descriptor.addFamily(descriptor2); admin.createTable(descriptor); } //獲得與hbase的連線 private static Connection getConnection(String ip, String port) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", ip); configuration.set("hbase.zookeeper.property.clientPort", port); Connection connection = ConnectionFactory.createConnection(configuration); return connection; } }
3,reducer
package com; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class reducer extends Reducer<Text, User, Text, Text>{ @Override protected void reduce(Text keyin, Iterable<User> value, Reducer<Text, User, Text, Text>.Context conetxt) throws IOException, InterruptedException { ArrayList<User> list=new ArrayList<User>(); //克隆迭代器中的資料 for(User user:value) { User user1=new User(); System.out.println(user); try { BeanUtils.copyProperties(user1, user); list.add(user1); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("list+++++++++++++++"+list); //呼叫hbase工具的方法 try { HbaseUtils.insertinfo("192.168.184.131", "2181", "sw", list); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } conetxt.write(new Text("status"), new Text(":success")); } }
4,main
package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class main { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name", "local"); conf.set("fs.defaultFS", "file:///"); Job wordCountJob = Job.getInstance(conf); //重要:指定本job所在的jar包 wordCountJob.setJarByClass(main.class); //設定wordCountJob所用的mapper邏輯類為哪個類 wordCountJob.setMapperClass(mapper.class); //設定wordCountJob所用的reducer邏輯類為哪個類 wordCountJob.setReducerClass(reducer.class); //設定map階段輸出的kv資料型別 wordCountJob.setMapOutputKeyClass(Text.class); wordCountJob.setMapOutputValueClass(User.class); //設定最終輸出的kv資料型別 wordCountJob.setOutputKeyClass(Text.class); wordCountJob.setOutputValueClass(Text.class); //設定要處理的文字資料所存放的路徑 FileInputFormat.setInputPaths(wordCountJob, "C:\\test\\in6\\data.txt"); FileOutputFormat.setOutputPath(wordCountJob, new Path("C:\\test\\out6")); //提交job給hadoop叢集 wordCountJob.waitForCompletion(true); } }