使用bulkload的方式將資料直接生成HFile格式時報錯異常問題
阿新 • • 發佈:2021-03-31
HBase與MapReduce的整合中使用bulkload的方式將資料直接生成HFile格式時報錯
報錯時程式碼
BulkLoadMappackage com.kami.demo10; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @version v 1.0 * @Author kami * @Date 2019/12/19 */ public class BulkLoadMap extends Configured implements Tool { //bulkLoad只寫一個map程式碼即可 //將HDFS資料轉化成Hfile public static class BulkLoadData extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] sps = value.toString().split(" "); String rowkey = sps[0]; String name = sps[1]; String age = sps[2]; Put put = new Put(rowkey.getBytes()); put.addColumn("f1".getBytes(), "name".getBytes(), name.getBytes()); put.addColumn("f1".getBytes(), "age".getBytes(), age.getBytes()); context.write(new ImmutableBytesWritable(rowkey.getBytes()), put); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("myuser2")); Job job = Job.getInstance(conf, "rua"); job.setJarByClass(BulkLoadMap.class); job.setMapperClass(BulkLoadData.class); job.setOutputFormatClass(HFileOutputFormat2.class); HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2"))); FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile")); //資料輸入型別 文字型別 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt")); //資料輸出型別 job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new BulkLoadMap(), args); System.out.println(run); } }
報錯異常現象及原因
19/12/19 19:48:47 WARN mapred.LocalJobRunner: job_local244116549_0001 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:43) at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:36) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:449)
發生報錯 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue
經過檢查發現給map設定資料輸出型別的設定放在了HFileOutputFormat2.configureIncrementalLoad之後,在Dirver的這段程式碼之前要將job的輸入路徑,輸出路徑,輸出資料型別要先設定好,如果沒有設定好輸出型別,就會爆出警告,且沒有結果資料輸出
解決方法
將map的資料輸出型別的程式碼移動到HFileOutputFormat2的配置程式碼之前,問題解決@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183");
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("myuser2"));
Job job = Job.getInstance(conf, "rua");
job.setJarByClass(BulkLoadMap.class);
job.setMapperClass(BulkLoadData.class);
//資料輸出型別
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile"));
//資料輸入型別 文字型別
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt"));
return job.waitForCompletion(true) ? 0 : 1;
}