hbase的java API使用詳解
阿新 • • 發佈:2019-02-02
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; /** HBase當中的核心API :13個 HBaseConfiguration 配置資訊 Connection HBaseAdmin/Admin 管理員 HTable 整個表的抽象資訊 HColumnDescriptor 列的描述資訊 HTableDescriptor 列簇的描述資訊 Put Delete Get Scan KeyValue Cell Result ResultScanner */ public class HBase_First { public static void main(String[] args) throws Exception { /** * 第一步: 獲取連線 */ // 建立一個可以用來管理hbase配置資訊的config物件 Configuration config = HBaseConfiguration.create(); // 設定當前的程式去尋找的hbase在哪裡 config.set("hbase.zookeeper.quorum", "hadoop02:2181,hadoop03:2181,hadoop04:2181"); //建立和建立連線 Connection con = ConnectionFactory.createConnection(config); // 根據連接獲取到一個管理員物件 Admin admin = con.getAdmin(); /** * 第二步:通過連線進行資料庫的響應操作 */ //檢查表是否存在,存在返回true boolean tableExists = admin.tableExists(TableName.valueOf("user_info_1")); System.out.println(tableExists); /** * 第三步:關閉連線 */ admin.close(); con.close(); } }
import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; public class HBase_API { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "hadoop02:2181,hadoop03:2181,hadoop04:2181"; public static void main(String[] args) { } private static Connection con = null; private static Admin admin = null; private static Table table = null; @Before public void init(){ // 建立一個可以用來管理hbase配置資訊的config物件 Configuration config = HBaseConfiguration.create(); // 設定當前的程式去尋找的hbase在哪裡 config.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); try { //建立和建立連線 con = ConnectionFactory.createConnection(config); // 根據連接獲取到一個管理員物件(做一個管理的操作如建立、刪除、查詢表用admin,針對表做具體的資料操作就通過con來獲取表的物件) admin = con.getAdmin(); //獲取專門用來進行資料處理的、代表某張表的一個 表物件table,用來操作user_info這個表 table = con.getTable(TableName.valueOf("user_info")); } catch (Exception e) { System.out.println("獲取hbase連線失敗"); } } /** * 建立表的方法 */ @Test public void createTable() throws Exception{ //建立一個表名物件tn,表名為my_stu TableName tn = TableName.valueOf("my_stu"); //建立一個表的描述物件HTableDescriptor(包含表名和列簇名),並且它的表名為tn物件的值--my_stu HTableDescriptor htd = new HTableDescriptor(tn); //建立一個列簇為"cf1"的列簇物件cf1,注意在建立表時候必須寫列簇 HColumnDescriptor cf1 = new HColumnDescriptor("cf1"); //新增一個列簇物件cf1 htd.addFamily(cf1); //建立一個表 admin.createTable(htd); //如果有tn名字的表,那麼建立成功 if(admin.tableExists(tn)){ System.out.println("建立表成功"); }else{ System.out.println("建立表失敗"); } } /** * 輸出所有的表的列簇名字 */ @Test public void listTables() throws Exception{ //HTableDescriptor的物件包含表名和列簇名,獲取資料庫中的所有表的資訊 HTableDescriptor[] listTables = admin.listTables(); for(HTableDescriptor htd: listTables){ //輸出所有的表名 System.out.print(htd.getTableName() + "\t"); //獲取所有的列簇的一個集合 HColumnDescriptor[] columnFamilies = htd.getColumnFamilies(); for(HColumnDescriptor hcd : columnFamilies){ //獲取當前列簇的名字getName()返回的是位元組型別的 String name = Bytes.toString(hcd.getName()); // String name = new String(hcd.getName()) System.out.print(name + " "); } System.out.println(); } } /** * 刪除表 */ @Test public void dropTable() throws Exception{ //表名物件 TableName tn = TableName.valueOf("my_stu"); //檢查表是否有效,也就是表是否啟用,啟用返回true,停用返回false boolean tableEnabled = admin.isTableEnabled(tn); if(tableEnabled){//先判斷表的狀態比較穩妥,否則停用狀態再停用可能出問題 //把tn錶停用 admin.disableTable(tn); } //刪除my_stu表(先停用表再刪除表) admin.deleteTable(tn); //判斷tn表在不在資料庫裡面,如果在返回true if(admin.tableExists(tn)){ System.out.println("刪除表失敗"); }else{ System.out.println("刪除表成功"); } } /** * 表示往某張表中進行資料的插入 * * 專門用來進行資料處理的、代表某張表的一個 表物件,就是 HTable 類的一個例項物件----table */ //向某張表中插入一條資料 @Test public void putData() throws Exception{ //建立一個put物件,因為要插入資料所以必須指定row key,現在僅僅是指定行健,沒有對應的keyvalue值 Put put = new Put("rk01".getBytes()); //指定列簇、列、值,現在要插入的一行資料就準備好了 put.addColumn("base_info".getBytes(), "xx".getBytes(), "yy".getBytes()); //往表裡插入資料,傳一個put物件插入一行記錄,穿多個put物件插入多行記錄 table.put(put); } //向某張表中插入多條記錄 @Test public void putDatas() throws Exception{ Put put1 = new Put("rk03".getBytes()); put1.addColumn("base_info".getBytes(), "xxx".getBytes(), "yy".getBytes()); Put put2 = new Put("rk02".getBytes()); put2.addColumn("base_info".getBytes(), "xxx".getBytes(), "yy".getBytes()); List<Put> puts = new ArrayList(); puts.add(put1); puts.add(put2); //.put()方法可以傳list的一堆Put物件,插入就用put()方法 table.put(puts); } //刪除資料 @Test public void deleteData() throws Exception{ //在增刪改查row key都需要指定的,如果不寫addColumn()那麼就是刪除rk這一行的資料 Delete delete = new Delete("rk02".getBytes()); //刪除rk02這條記錄下的base_info列簇的xxx key delete.addColumn("base_info".getBytes(), "xxx".getBytes()); //刪除就用delete()方法 table.delete(delete); } //get查詢某個表中的資料 @Test public void getData() throws Exception{ //行鍵row key,如果不下addFamily()方法就是查詢一行的所有列簇下的所有cell的資訊 Get get = new Get("baiyc_20150716_0005".getBytes()); //新增一個列簇物件(用來查詢指定列簇下的cell資訊) get.addFamily("base_info".getBytes()); //返回一個Result物件 Result result = table.get(get); //返回所有查詢到的單元格cell List<Cell> cells = result.listCells(); //迴圈拿cell for(Cell c : cells){ //直接列印是看不懂的,cells本身就是個位元組陣列 System.out.println(c.toString()); //獲取到列簇資訊 String family = Bytes.toString(c.getFamily()); //獲取到列key資訊 String qualifier = Bytes.toString(c.getQualifier()); //獲取到value值 String value = Bytes.toString(c.getValue()); //獲取時間戳 long ts = c.getTimestamp(); System.out.println(family + "\t" + qualifier + "\t" + value + "\t" + ts); } } //Scan查詢某個表中的資料 @Test public void getResultScanner() throws Exception{ //無參方法就是全部,不寫下面的限制範圍的方法就是全表掃描 Scan scan = new Scan(); //查詢某些特定的列簇 // scan.addFamily("base_info".getBytes()); //查詢base_info列簇和name列的資訊 scan.addColumn("base_info".getBytes(), "name".getBytes()); //從哪個行鍵開始掃描 scan.setStartRow("rk01".getBytes()); //從哪個行鍵結束掃描 scan.setStopRow("zhangsan_20150701_0004".getBytes()); //上面的四條限制範圍都可以單獨或同時使用,且使用越多越嚴格 ResultScanner scanner = table.getScanner(scan); //解析ResultScanner物件的資訊並且列印輸出,跟上面的很像 for (Result result : scanner) { List<Cell> cells = result.listCells(); for (int i = 0; i < cells.size(); i++) { Cell cell = cells.get(i); System.out.println(Bytes.toString(cell.getRow()) + "\t" + Bytes.toString(cell.getFamily()) + "\t" + Bytes.toString(cell.getQualifier()) + "\t" + Bytes.toString(cell.getValue()) + "\t" + cell.getTimestamp()); } } } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.RowFilter; import com.ghgj.hbase.util.HBasePrintUtil; public class FliterTest { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "hadoop02:2181,hadoop03:2181,hadoop04:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); HTable table = (HTable) con.getTable(TableName.valueOf("user_info")); Scan scan = new Scan(); /** * 在這兒給scan新增過濾器 * * 構造過濾器的時候,要注意這兩個引數的意義: * * 第一個引數: 比較規則 * 第二個引數: 比較器 */ Filter filter1 = new RowFilter(CompareOp.GREATER, new BinaryComparator("rk03".getBytes())); Filter filter2 = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator("base_info".getBytes())); /** * 分頁過濾的構造引數就是 : 每一頁的總資料條數 * Filter pageFilter = new PageFilter(3,4); ---- id : 9-12 * 真正的分頁實現就只需要兩個引數: pageIndex pageSize * * pageIndex: 第幾頁 * * pageSzie :每頁大小 */ Filter pageFilter = new PageFilter(3); Filter filter = new FilterList(filter1, filter2, pageFilter); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); HBasePrintUtil.printResultScanner(scanner); admin.close(); con.close(); } }
package com.ghgj.hbase.page;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import com.ghgj.hbase.util.HBasePrintUtil;
import com.ghgj.hbase.util.HBaseUtil;
public class HBase_Page {
public static void main(String[] args) throws Exception {
// ResultScanner pageData = getPageData(3, 10);
// ResultScanner pageData = getPageData(2, 2);
// ResultScanner pageData1 = getPageData(1, 3);
// ResultScanner pageData1 = getPageData("baiyc_20150716_0002", 3);
// ResultScanner pageData2 = getPageData("rk01", 3);
ResultScanner pageData2 = getPageData(3, 2000);
HBasePrintUtil.printResultScanner(pageData2);
}
/**
* @param pageIndex 第幾頁
* @param pageSize 每一頁的記錄總數
* @return
*
* 負責編寫JS程式碼的前端人員。他們是不知道。怎麼傳入startRow
*/
public static ResultScanner getPageData(int pageIndex, int pageSize) throws Exception{
if(pageSize < 3 || pageSize > 15){
pageSize = 5;
}
/**
* 當前這個程式碼的真實作用就是把:;
*
* "baiyc_20150716_0001", 3
*
* 轉換成:
*
* 2, 3
*
* 難點: 就是 pageIndex 轉成 startRow
*/
String startRow = getCurrentPageStartRow(pageIndex, pageSize);
return getPageData(startRow, pageSize);
}
/**
* 當前這個方法的作用:
*
* 就是把 前端人員 穿送過來的 pageIndex 轉換成 startRow
*
* 以方便呼叫底層最簡單的獲取一頁分頁資料的 方法: getPageData(startRow, pageSize)
*
* @param pageIndex
* @param pageSize
* @return
*/
private static String getCurrentPageStartRow(int pageIndex, int pageSize) throws Exception {
// 怎麼實現?
// 如果 傳送過來的額 pageIndex 不合法。 預設返回 第一頁資料
if(pageIndex <= 1){
/*pageIndex == -1
轉成了
startRow == null*/
return null;
}else{
// 從第二頁開始的所有資料。
String startRow = null;
for(int i = 1; i <= pageIndex - 1; i++){
// 第幾次迴圈,就是獲取第幾頁的資料
ResultScanner pageData = getPageData(startRow, pageSize);
// 獲取當前這一頁的最後rowkey
Iterator<Result> iterator = pageData.iterator();
Result result = null;
while(iterator.hasNext()){
result = iterator.next();
}
// 讓最後一個rowkey往後挪動一點位置,但是又不會等於下一頁的 startRow
String endRowStr = new String(result.getRow());
byte[] add = Bytes.add(endRowStr.getBytes(), new byte[]{ 0x00});
String nextPageStartRowStr = Bytes.toString(add);
//
startRow = nextPageStartRowStr;
}
return startRow;
}
}
/**
* 描述:
*
* 從 startRow開始 查詢 pageSize 條資料
*
* @param startRow
* @param pageSize
* @return
*/
public static ResultScanner getPageData(String startRow, int pageSize) throws IOException{
Connection con = HBaseUtil.getConnection();
// Admin admin = HBaseUtil.getAdmin();
Table table = HBaseUtil.getTable("user_info");
Scan scan = new Scan();
// 設定起始行健搞定
// 如果是第一頁資料, 所以 scan.setStartRow這句程式碼根本就沒有任何意義。。 不用設定即可
if(!StringUtils.isBlank(startRow)){
// 如果使用者不傳入 startRow, 或者傳入了一個 非法的 startRow, 還是按照規則 返回 第一頁資料
scan.setStartRow(startRow.getBytes());
}
// 設定總資料條件
Filter pageFilter = new PageFilter(pageSize);
scan.setFilter(pageFilter);
ResultScanner scanner = table.getScanner(scan);
return scanner;
}
}
package com.ghgj.hbase.mr;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* get 'stduent','rk01' ==== Result
*
* 需求:讀出所有的記錄(Result),然後提取出對應的 age 資訊
*
* mapper階段的
*
* 輸入: 從hbase來
*
* key : rowkey
* value : result
*
* ImmutableBytesWritable, Result
*
* 輸出: hdfs
*
* key : age
* value : 年齡值
*
* reducer階段:
*
* 輸入:
*
* key : "age"
* value : 年齡值 = 18
*
* 輸出:
*
* key: NullWritbale
* value: 平均
*/
public class ReadDataFromHBaseToHDFSMR extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new ReadDataFromHBaseToHDFSMR(), args);
System.exit(run);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "hadoop02:2181,hadoop03:2181");
config.set("fs.defaultFS", "hdfs://myha01/");
config.addResource("config/core-site.xml");
config.addResource("config/hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
Job job = Job.getInstance(config, "ReadDataFromHBaseToHDFSMR");
job.setJarByClass(ReadDataFromHBaseToHDFSMR.class);
// 從此開始,就是設定當前這個MR程式的各種job細節
Scan scan = new Scan();
scan.addColumn("info".getBytes(), "age".getBytes());
TableMapReduceUtil.initTableMapperJob(
"student".getBytes(), // 指定表名
scan, // 指定掃描資料的條件
ReadDataFromHBaseToHDFSMR_Mapper.class, // 指定mapper class
Text.class, // outputKeyClass mapper階段的輸出的key的型別
IntWritable.class, // outputValueClass mapper階段的輸出的value的型別
job,
false); // job物件
job.setReducerClass(ReadDataFromHBaseToHDFSMR_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
/**
* 在當前的MR程式中。 輸入的資料是來自於 HBase, 按照常理來說,需要自定義一個數據讀取元件 讀 hbase
*
* 但是:TableMapReduceUtil.initTableMapperJob 這個方法已經做了。!!!!!!
*/
FileOutputFormat.setOutputPath(job, new Path("/student/avgage_output2"));
boolean isDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
public static class ReadDataFromHBaseToHDFSMR_Mapper extends TableMapper<Text, IntWritable>{
Text outKey = new Text("age");
/**
* key = 就是rowkey
*
* value = 就是一個result物件
*/
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
boolean containsColumn = value.containsColumn("info".getBytes(), "age".getBytes());
if(containsColumn){
List<Cell> cells = value.getColumnCells("info".getBytes(), "age".getBytes());
Cell cell = cells.get(0);
byte[] cloneValue = CellUtil.cloneValue(cell);
String age = Bytes.toString(cloneValue);
context.write(outKey, new IntWritable(Integer.parseInt(age)));
}
}
}
public static class ReadDataFromHBaseToHDFSMR_Reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
int sum = 0;
for(IntWritable iw : values){
count++;
sum += iw.get();
}
double avgAge = sum * 1D / count;
context.write(key, new DoubleWritable(avgAge));
}
}
}
package com.ghgj.hbase.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce1111.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.runner.Result;
/**
* 需求:讀取HDFS上的資料。插入到HBase庫中
*
* hbase.zookeeper.quorum == hadoop02:2181
*/
public class ReadHDFSDataToHBaseMR extends Configured implements Tool{
@Override
public int run(String[] arg0) throws Exception {
// Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "hdfs://myha01/");
// conf.addResource("config/core-site.xml");
// conf.addResource("config/hdfs-site.xml");
// config === HBaseConfiguration
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "hadoop02:2181,hadoop03:2181");
config.set("fs.defaultFS", "hdfs://myha01/");
config.addResource("config/core-site.xml");
config.addResource("config/hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
Job job = Job.getInstance(config, "ReadHDFSDataToHBaseMR");
job.setJarByClass(ReadHDFSDataToHBaseMR.class);
job.setMapperClass(HBaseMR_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 設定資料讀取元件
job.setInputFormatClass(TextInputFormat.class);
// 設定資料的輸出元件
// job.setOutputFormatClass(cls);
// TableMapReduceUtil.initTableReducerJob("student", HBaseMR_Reducer.class, job);
TableMapReduceUtil.initTableReducerJob("student", HBaseMR_Reducer.class, job, null, null, null, null, false);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
// FileInputFormat.addInputPath(job, new Path("E:\\bigdata\\hbase\\student\\input"));
FileInputFormat.addInputPath(job, new Path("/student/input/"));
boolean isDone = job.waitForCompletion(true);
return isDone ? 0: 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new ReadHDFSDataToHBaseMR(), args);
System.exit(run);
}
public static class HBaseMR_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
/**
* 每次讀取一行資料
*
* Put : 構造一個put物件的時候,需要
* put 'stduent','95001','cf:name','liyong'
*
*
* name:huangbo
* age:18
*
* name:xuzheng
*
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class HBaseMR_Reducer extends TableReducer<Text, NullWritable, NullWritable>{
/**
* key === 95011,包小柏,男,18,MA
*
* 95001: rowkey
* 包小柏 : name
* 18 : age
* 男 : sex
* MA : department
*
* column family : cf
*/
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String[] split = key.toString().split(",");
Put put = new Put(split[0].getBytes());
put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());
put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());
put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes());
context.write(NullWritable.get(), put);
}
}
}