hbase刪除某個列的資料
阿新 • • 發佈:2018-11-04
碰到這樣一個事:我們往hbase裡面導資料, 補了快一年的資料了,結果發現某個列的資料有幾個月是有問題的,不能用,所以需要將這個列的有問題的幾個月資料全部幹掉, 查了hbase的命令,發現沒有這種根據rowkey範圍直接刪除某個列的命令. 所以只能自己寫了: 可以採用客戶端程式設計的方式,也可以採用hbase on mr的方式,我這裡採用的是hbase on mr的方式。原因是如果採用客戶端程式設計的方式,需要scan所有的主鍵,然後判斷rowkey是否符合刪除的要求,如果符合則刪除,因為資料量很大,這種方式可能太慢,其次是怕把客戶端直接給弄死了。採用mr分散式的做法就不用擔心這方面的問題。
注:
1.hbase的版本是: HBase 0.98.9
2. rowkey的形式是 userid+yyyyMMdd的形式, 比如: 0000120181103, 這裡需要把20180406之前的資料的某個列( f:cl )幹掉,程式碼如下:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; public class HbaseDelColMr { static class DelColMapper extends TableMapper<Text, NullWritable> { private Text dekKey = new Text(); // key: rowkey // result: 一行的資料 @Override public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException { //拿到 rowkey String rowkey = Bytes.toString(key.get()); // 判斷 rowkey 是否需要刪除 rowkey的型別類似這種字串 12556565620180405 String dateStr = rowkey.substring(rowkey.length() - 8, rowkey.length()); //如果在20180406之前的資料全部需要刪掉 if (Integer.parseInt(dateStr) < 20180406) { dekKey.set(rowkey); context.write(dekKey, NullWritable.get()); } } } static class DelColReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> { @Override public void reduce(Text delKey, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // delKey 這就是要刪除的rowkey Delete delete = new Delete(Bytes.toBytes(delKey.toString())); //設定要刪除的列 delete.deleteColumn(Bytes.toBytes("f"), Bytes.toBytes("cl")); context.write(new ImmutableBytesWritable(Bytes.toBytes(delKey.toString())), delete); } } public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "zk_1,zk_2,zk_3,zk_4,zk_5"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); //configuration.set("hbase.local.dir", "/tmp/hbase-local-dir_test"); String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs(); for (String ar:otherArgs) { System.out.println(ar+" ======================================"); } Job job = Job.getInstance(configuration); job.setJobName("HbaseDelColMr"); job.setJarByClass(HbaseDelColMr.class); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cl")); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob( otherArgs[0], //輸入表 "dt_list_detail_test" scan, // scan 物件 DelColMapper.class, Text.class, //mapper輸出的key型別 NullWritable.class, //mapper輸出的value型別 job ); TableMapReduceUtil.initTableReducerJob( otherArgs[0],// 輸出表 "dt_list_detail_test" DelColReducer.class, job); job.setNumReduceTasks(10); boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("任務出錯....."); } } }
還有一種效率更高更加簡便的方式, 就是去的reduce階段, 如下:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; public class HbaseDelColMr2 { static class DelColMapper extends TableMapper<ImmutableBytesWritable, Delete> { @Override public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException { String rowkey = Bytes.toString(key.get()); //拿到 rowkey // 判斷 rowkey 是否需要刪除 rowkey的型別類似這種字串 12556565620180405 String dateStr = rowkey.substring(rowkey.length() - 8, rowkey.length()); //如果在20180406之前的資料全部需要刪掉 if (Integer.parseInt(dateStr) < 20180406) { //設定要刪除的列 Delete delete = new Delete(Bytes.toBytes(rowkey)); delete.deleteColumn(Bytes.toBytes("f"), Bytes.toBytes("cl")); context.write(key, delete); //需要測試如果沒有reduce階段,這裡是否會直接寫入到hbase, 補充:結論是可以的 } } } public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "zk_1,zk_2,zk_3,zk_4,zk_5"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); //configuration.set("hbase.local.dir", "/tmp/hbase-local-dir_test"); String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs(); for (String ar:otherArgs) { System.out.println(ar+" ======================================"); } Job job = Job.getInstance(configuration); job.setJobName("HbaseDelColMr2"); job.setJarByClass(HbaseDelColMr2.class); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cl")); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob( otherArgs[0], //輸入表 "dt_list_detail_test" scan, // scan 物件 DelColMapper.class, null, //沒有輸出,直接寫入hbase null, //沒有輸出,直接寫入hbase job ); TableMapReduceUtil.initTableReducerJob( otherArgs[0],// 輸出表 "dt_list_detail_test" null, job); job.setNumReduceTasks(0); boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("任務出錯....."); } } }
打包呼叫:
export HADOOP_CLASSPATH=`hbase classpath`
yarn jar ./hbaseDeltest.jar xxx.HbaseDelColMr -D mapreduce.job.queuename=xxx dt_list_detail_test
這樣子就可以啦,上面兩種方式隨便選一種就ok了。。。。。。