1. 程式人生 > 其它 >使用bulkload的方式將資料直接生成HFile格式時報錯異常問題

使用bulkload的方式將資料直接生成HFile格式時報錯異常問題

HBase與MapReduce的整合中使用bulkload的方式將資料直接生成HFile格式時報錯

報錯時程式碼

BulkLoadMap
package com.kami.demo10;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kami
 * @Date 2019/12/19
 */
public class BulkLoadMap extends Configured implements Tool {

    //bulkLoad只寫一個map程式碼即可
    //將HDFS資料轉化成Hfile
    public static class BulkLoadData extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] sps = value.toString().split(" ");
            String rowkey = sps[0];
            String name = sps[1];
            String age = sps[2];
            Put put = new Put(rowkey.getBytes());
            put.addColumn("f1".getBytes(), "name".getBytes(), name.getBytes());
            put.addColumn("f1".getBytes(), "age".getBytes(), age.getBytes());
            context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183");

        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));

        Job job = Job.getInstance(conf, "rua");
        job.setJarByClass(BulkLoadMap.class);
        job.setMapperClass(BulkLoadData.class);

        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile"));
        //資料輸入型別 文字型別
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt"));

        //資料輸出型別
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new BulkLoadMap(), args);
        System.out.println(run);
    }
}

報錯異常現象及原因

19/12/19 19:48:47 WARN mapred.LocalJobRunner: job_local244116549_0001
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue
at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:43)
at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:36)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:449)

發生報錯 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue

經過檢查發現給map設定資料輸出型別的設定放在了HFileOutputFormat2.configureIncrementalLoad之後,在Dirver的這段程式碼之前要將job的輸入路徑,輸出路徑,輸出資料型別要先設定好,如果沒有設定好輸出型別,就會爆出警告,且沒有結果資料輸出

解決方法

將map的資料輸出型別的程式碼移動到HFileOutputFormat2的配置程式碼之前,問題解決
@Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183");

        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));

        Job job = Job.getInstance(conf, "rua");
        job.setJarByClass(BulkLoadMap.class);
        job.setMapperClass(BulkLoadData.class);

        //資料輸出型別
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));

        FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile"));

        //資料輸入型別 文字型別
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt"));

        return job.waitForCompletion(true) ? 0 : 1;
    }