1. 程式人生 > >hbase實戰之javaAPI插入資料

hbase實戰之javaAPI插入資料


一,實現思路

  1,先mapreduces得到並傳遞資料。

  2,寫好連線表,建立表,插入表hbase資料庫的工具。

  3,在reduces中呼叫寫好的hbase工具。

  4,main類提交。

 

二,程式碼書寫

 

  1,mapper

 

package com;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//傳遞資料
public class mapper extends Mapper<LongWritable, Text, Text, User>{

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, User>.Context context)
			throws IOException, InterruptedException {

		String data = value.toString();
		String[] s = data.split(",");
	    System.out.println(data);
		context.write(new Text("1"), new User(s[0],s[1],s[2],s[3],s[4]));
	}
}

  

  2,hbase工具類

 

package com;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

public class HbaseUtils {
	public static final String c="info";
	//reducer呼叫的方法
	public static  void insertinfo(String ip,String port,String tableName,List<User> list) throws Exception{
		Connection con=getConnection(ip,port);	
		HBaseAdmin admin = (HBaseAdmin)con.getAdmin();
		Table table = con.getTable(TableName.valueOf(tableName));
		boolean b = admin.tableExists(TableName.valueOf(tableName));
		if(!b){
			createTable(admin,tableName);
		}
		insertList(table,list);
	}
	//插入資料的方法
	private static void insertList(Table table, List<User> list) throws Exception {
		for (User user : list) {
			Put put = new Put(user.getId().getBytes());
			put.addColumn(c.getBytes(), "name".getBytes(), user.getName().getBytes());
			put.addColumn(c.getBytes(), "Age".getBytes(), user.getAge().getBytes());
			put.addColumn(c.getBytes(), "Sex".getBytes(), user.getSex().getBytes());
			put.addColumn(c.getBytes(), "Part".getBytes(), user.getPart().getBytes());
			table.put(put);
		}
	}
	//建立表的方法
	private static void createTable(Admin admin, String tableName) throws Exception {
		HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
		HColumnDescriptor descriptor2 = new HColumnDescriptor(c);
		descriptor.addFamily(descriptor2);
		admin.createTable(descriptor);
	}
	//獲得與hbase的連線
	private static Connection getConnection(String ip, String port) throws Exception {
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.quorum", ip);
		configuration.set("hbase.zookeeper.property.clientPort", port);
		Connection connection = ConnectionFactory.createConnection(configuration);
		return connection;
	}
}

  

  3,reducer

 

package com;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class reducer extends Reducer<Text, User, Text, Text>{
	@Override
	protected void reduce(Text keyin, Iterable<User> value, Reducer<Text, User, Text, Text>.Context conetxt)
			throws IOException, InterruptedException {
		ArrayList<User> list=new ArrayList<User>();
		//克隆迭代器中的資料
		for(User user:value) {
			User user1=new User();
			System.out.println(user);
			
			try {
				BeanUtils.copyProperties(user1, user);
				list.add(user1);
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		System.out.println("list+++++++++++++++"+list);
		//呼叫hbase工具的方法
		try {
			HbaseUtils.insertinfo("192.168.184.131", "2181", "sw", list);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		conetxt.write(new Text("status"), new Text(":success"));
	}
}

  

  4,main

 

package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class main {
public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();
	conf.set("mapreduce.framework.name", "local");
	conf.set("fs.defaultFS", "file:///");
	Job wordCountJob = Job.getInstance(conf);	
	//重要:指定本job所在的jar包
	wordCountJob.setJarByClass(main.class);	
	//設定wordCountJob所用的mapper邏輯類為哪個類
	wordCountJob.setMapperClass(mapper.class);
	//設定wordCountJob所用的reducer邏輯類為哪個類
	wordCountJob.setReducerClass(reducer.class);	
	//設定map階段輸出的kv資料型別
	wordCountJob.setMapOutputKeyClass(Text.class);
	wordCountJob.setMapOutputValueClass(User.class);	
	//設定最終輸出的kv資料型別
	wordCountJob.setOutputKeyClass(Text.class);
	wordCountJob.setOutputValueClass(Text.class);	
	//設定要處理的文字資料所存放的路徑
	FileInputFormat.setInputPaths(wordCountJob, "C:\\test\\in6\\data.txt");
	FileOutputFormat.setOutputPath(wordCountJob, new Path("C:\\test\\out6"));	
	//提交job給hadoop叢集
	wordCountJob.waitForCompletion(true);
}
}