1. 程式人生 > >Hadoop生成HFile直接入庫HBase心得

Hadoop生成HFile直接入庫HBase心得

轉載請標明出處:http://blackwing.iteye.com/blog/1991380 


hbase自帶了ImportTsv類,可以直接把tsv格式(官方教材顯示,是\t分割各個欄位的文字格式)生成HFile,並且使用另外一個類org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移動到hbase對應的hdfs目錄。 

PS:網上看到一個XD說,直接生成HFile併入庫HBase效率不如先生成HFile,再通過LoadIncrementalHFiles移動檔案到hbase目錄高,這點沒有驗證,我的做法也是先生成,再move。 

官方教材在此: 

Java程式碼  收藏程式碼
  1. http://hbase.apache.org/book/ops_mgt.html#importtsv  

但ImportTsv功能對我來說不適合,例如檔案格式為: 
Java程式碼  收藏程式碼
  1. topsid   uid   roler_num   typ        time  
  2. 10      111111   255         0       1386553377000  

ImportTsv匯入的命令為: 
Java程式碼  收藏程式碼
  1. bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,kq:topsid,kq:uid,kq:roler_num,kq:type -Dimporttsv.bulk.output=hdfs://storefile-outputdir <hdfs-data-inputdir>
      


它生成的表格式為: 
Java程式碼  收藏程式碼
  1. row : 10   
  2. cf  :  kq  
  3. qualifier: topsid  
  4. value: 10  
  5. .....  

而我要求的格式是: 
Java程式碼  收藏程式碼
  1. row : 10-111111-255  
  2. cf  :  kq  
  3. qualifier: 0  
  4. value: 1  


所以還是自己寫MR處理資料方便。 
Mapper: 
Java程式碼  收藏程式碼
  1. /* 
  2.  * adminOnOff.log 檔案格式: 
  3.  * topsid   uid   roler_num   typ   time 
  4.  * */  
  5. public class HFileImportMapper2 extends
      
  6.         Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {  
  7.     protected SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");  
  8.     protected final String CF_KQ="kq";//考勤  
  9.     protected final int ONE=1;  
  10.     @Override  
  11.     protected void map(LongWritable key, Text value,Context context)  
  12.             throws IOException, InterruptedException {  
  13.         String line = value.toString();  
  14.         System.out.println("line : "+line);  
  15.         String[] datas = line.split("\\s+");  
  16.         // row格式為:yyyyMMdd-sid-uid-role_num-timestamp-typ  
  17.         String row = sdf.format(new Date(Long.parseLong(datas[4])))  
  18.                 + "-" + datas[0] + "-" + datas[1] + "-" + datas[2]  
  19.                 + "-" + datas[4] + "-" + datas[3];  
  20.         ImmutableBytesWritable rowkey = new ImmutableBytesWritable(  
  21.                 Bytes.toBytes(row));  
  22.         KeyValue kv = new KeyValue(Bytes.toBytes(row),this.CF_KQ.getBytes(), datas[3].getBytes(),Bytes.toBytes(this.ONE));  
  23.         context.write(rowkey, kv);  
  24.         }  
  25. }  


job: 
Java程式碼  收藏程式碼
  1. public class GenHFile2 {  
  2.     public static void main(String[] args) {  
  3.         Configuration conf = new Configuration();  
  4.         conf.addResource("myConf.xml");  
  5.         String input = conf.get("input");  
  6.         String output = conf.get("output");  
  7.         String tableName = conf.get("source_table");  
  8.         System.out.println("table : "+tableName);  
  9.         HTable table;  
  10.         try {  
  11.             //執行前,刪除已存在的中間輸出目錄  
  12.             try {  
  13.                 FileSystem fs = FileSystem.get(URI.create(output), conf);  
  14.                 fs.delete(new Path(output),true);  
  15.                 fs.close();  
  16.             } catch (IOException e1) {  
  17.                 e1.printStackTrace();  
  18.             }  
  19.             table = new HTable(conf,tableName.getBytes());  
  20.             Job job = new Job(conf);  
  21.             job.setJobName("Generate HFile");  
  22.             job.setJarByClass(HFileImportMapper2.class);  
  23.             job.setInputFormatClass(TextInputFormat.class);  
  24.             job.setMapperClass(HFileImportMapper2.class);  
  25.             FileInputFormat.setInputPaths(job, input);  
  26. //job.setReducerClass(KeyValueSortReducer.class);  
  27. //job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
  28. //job.setMapOutputValueClass(KeyValue.class);  
  29.             job.getConfiguration().set("mapred.mapoutput.key.class""org.apache.hadoop.hbase.io.ImmutableBytesWritable");  
  30.             job.getConfiguration().set("mapred.mapoutput.value.class""org.apache.hadoop.hbase.KeyValue");  
  31. //job.setOutputFormatClass(HFileOutputFormat.class);  
  32. FileOutputFormat.setOutputPath(job, new Path(output));  
  33.     //job.setPartitionerClass(SimpleTotalOrderPartitioner.class);  
  34. HFileOutputFormat.configureIncrementalLoad(job,table);  
  35.             try {  
  36.                 job.waitForCompletion(true);  
  37.             } catch (InterruptedException e) {  
  38.                 e.printStackTrace();  
  39.             } catch (ClassNotFoundException e) {  
  40.                 e.printStackTrace();  
  41.             }  
  42.         } catch (IOException e) {  
  43.             e.printStackTrace();  
  44.         }  
  45.     }  
  46. }  


生成的HFile檔案在hdfs的/output目錄下,已經根據cf名稱建好檔案目錄: 
Java程式碼  收藏程式碼
  1. hdfs://namenode/output/kq/601c5029fb264dc8869a635043c24560  

其中: 
Java程式碼  收藏程式碼
  1. HFileOutputFormat.configureIncrementalLoad(job,table);  

根據其原始碼知道,會自動為job設定好以下引數: 
Java程式碼  收藏程式碼
  1. public static void configureIncrementalLoad(Job job, HTable table)  
  2. throws IOException {  
  3.   Configuration conf = job.getConfiguration();  
  4.   job.setOutputKeyClass(ImmutableBytesWritable.class);  
  5.   job.setOutputValueClass(KeyValue.class);  
  6.   job.setOutputFormatClass(HFileOutputFormat.class);  
  7.   // Based on the configured map output class, set the correct reducer to properly  
  8.   // sort the incoming values.  
  9.   // TODO it would be nice to pick one or the other of these formats.  
  10.   if (KeyValue.class.equals(job.getMapOutputValueClass())) {  
  11.     job.setReducerClass(KeyValueSortReducer.class);  
  12.   } else if (Put.class.equals(job.getMapOutputValueClass())) {  
  13.     job.setReducerClass(PutSortReducer.class);  
  14.   } else if (Text.class.equals(job.getMapOutputValueClass())) {  
  15.     job.setReducerClass(TextSortReducer.class);  
  16.   } else {  
  17.     LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());  
  18.   }  
  19.   conf.setStrings("io.serializations", conf.get("io.serializations"),  
  20.       MutationSerialization.class.getName(), ResultSerialization.class.getName(),  
  21.       KeyValueSerialization.class.getName());  
  22.   // Use table's region boundaries for TOP split points.  
  23.   LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));  
  24.   List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);  
  25.   LOG.info("Configuring " + startKeys.size() + " reduce partitions " +  
  26.       "to match current region count");  
  27.   job.setNumReduceTasks(startKeys.size());  
  28.   configurePartitioner(job, startKeys);  
  29.   // Set compression algorithms based on column families  
  30.   configureCompression(table, conf);  
  31.   configureBloomType(table, conf);  
  32.   configureBlockSize(table, conf);  
  33.   TableMapReduceUtil.addDependencyJars(job);  
  34.   TableMapReduceUtil.initCredentials(job);  
  35.   LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");  
  36. }  


HFileOutputFormat只支援寫單個column family,如果有多個cf,則需要寫多個job來實現了。