1. 程式人生 > >hbase海量資料匯入

hbase海量資料匯入

最近有個需求要對mysql的全量資料遷移到hbase,雖然hbase的設計非常利於高效的讀取,但是它的compaction實現對海量資料寫入造成非常大的影響,資料到一定量之後,就開始抽風。 
分析hbase的實現,不管其執行的機制,其最終儲存結構為分散式檔案系統中的hfile格式。 
剛好hbase的原始碼中提供一個HFileOutputFormat類,分析其原始碼可以看到: 
Java程式碼  收藏程式碼
  1. /** 
  2.  * Copyright 2009 The Apache Software Foundation 
  3.  * 
  4.  * Licensed to the Apache Software Foundation (ASF) under one
     
  5.  * or more contributor license agreements.  See the NOTICE file 
  6.  * distributed with this work for additional information 
  7.  * regarding copyright ownership.  The ASF licenses this file 
  8.  * to you under the Apache License, Version 2.0 (the 
  9.  * "License"); you may not use this file except in compliance
     
  10.  * with the License.  You may obtain a copy of the License at 
  11.  * 
  12.  *     http://www.apache.org/licenses/LICENSE-2.0 
  13.  * 
  14.  * Unless required by applicable law or agreed to in writing, software 
  15.  * distributed under the License is distributed on an "AS IS" BASIS, 
  16.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     
  17.  * See the License for the specific language governing permissions and 
  18.  * limitations under the License. 
  19.  */  
  20. package org.apache.hadoop.hbase.mapreduce;  
  21. import java.io.IOException;  
  22. import java.util.Map;  
  23. import java.util.TreeMap;  
  24. import org.apache.hadoop.conf.Configuration;  
  25. import org.apache.hadoop.fs.FileSystem;  
  26. import org.apache.hadoop.fs.Path;  
  27. import org.apache.hadoop.hbase.HConstants;  
  28. import org.apache.hadoop.hbase.KeyValue;  
  29. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  30. import org.apache.hadoop.hbase.io.hfile.Compression;  
  31. import org.apache.hadoop.hbase.io.hfile.HFile;  
  32. import org.apache.hadoop.hbase.regionserver.StoreFile;  
  33. import org.apache.hadoop.hbase.util.Bytes;  
  34. import org.apache.hadoop.mapreduce.RecordWriter;  
  35. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  36. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
  37. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  38. import org.mortbay.log.Log;  
  39. /** 
  40.  * Writes HFiles. Passed KeyValues must arrive in order. 
  41.  * Currently, can only write files to a single column family at a 
  42.  * time.  Multiple column families requires coordinating keys cross family. 
  43.  * Writes current time as the sequence id for the file. Sets the major compacted 
  44.  * attribute on created hfiles. 
  45.  * @see KeyValueSortReducer 
  46.  */  
  47. public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {  
  48.   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)  
  49.   throws IOException, InterruptedException {  
  50.     // Get the path of the temporary output file   
  51.     final Path outputPath = FileOutputFormat.getOutputPath(context);  
  52.     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();  
  53.     Configuration conf = context.getConfiguration();  
  54.     final FileSystem fs = outputdir.getFileSystem(conf);  
  55.     // These configs. are from hbase-*.xml  
  56.     final long maxsize = conf.getLong("hbase.hregion.max.filesize"268435456);  
  57.     final int blocksize = conf.getInt("hfile.min.blocksize.size"65536);  
  58.     // Invented config.  Add to hbase-*.xml if other than default compression.  
  59.     final String compression = conf.get("hfile.compression",  
  60.       Compression.Algorithm.NONE.getName());  
  61.     return new RecordWriter<ImmutableBytesWritable, KeyValue>() {  
  62.       // Map of families to writers and how much has been output on the writer.  
  63.       private final Map<byte [], WriterLength> writers =  
  64.         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);  
  65.       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;  
  66.       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());  
  67.       public void write(ImmutableBytesWritable row, KeyValue kv)  
  68.       throws IOException {  
  69.         long length = kv.getLength();  
  70.         byte [] family = kv.getFamily();  
  71.         WriterLength wl = this.writers.get(family);  
  72.         if (wl == null || ((length + wl.written) >= maxsize) &&  
  73.             Bytes.compareTo(this.previousRow, 0this.previousRow.length,  
  74.               kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {  
  75.           // Get a new writer.  
  76.           Path basedir = new Path(outputdir, Bytes.toString(family));  
  77.           if (wl == null) {  
  78.             wl = new WriterLength();  
  79.             this.writers.put(family, wl);  
  80.             if (this.writers.size() > 1throw new IOException("One family only");  
  81.             // If wl == null, first file in family.  Ensure family dir exits.  
  82.             if (!fs.exists(basedir)) fs.mkdirs(basedir);  
  83.           }  
  84.           wl.writer = getNewWriter(wl.writer, basedir);  
  85.           Log.info("Writer=" + wl.writer.getPath() +  
  86.             ((wl.written == 0)? """, wrote=" + wl.written));  
  87.           wl.written = 0;  
  88.         }  
  89.         kv.updateLatestStamp(this.now);  
  90.         wl.writer.append(kv);  
  91.         wl.written += length;  
  92.         // Copy the row so we know when a row transition.  
  93.         this.previousRow = kv.getRow();  
  94.       }  
  95.       /* Create a new HFile.Writer. Close current if there is one. 
  96.        * @param writer 
  97.        * @param familydir 
  98.        * @return A new HFile.Writer. 
  99.        * @throws IOException 
  100.        */  
  101.       private HFile.Writer getNewWriter(final HFile.Writer writer,  
  102.           final Path familydir)  
  103.       throws IOException {  
  104.         close(writer);  
  105.         return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),  
  106.           blocksize, compression, KeyValue.KEY_COMPARATOR);  
  107.       }  
  108.       private void close(final HFile.Writer w) throws IOException {  
  109.         if (w != null) {  
  110.           StoreFile.appendMetadata(w, System.currentTimeMillis(), true);  
  111.           w.close();  
  112.         }  
  113.       }  
  114.       public void close(TaskAttemptContext c)  
  115.       throws IOException, InterruptedException {  
  116.         for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {  
  117.           close(e.getValue().writer);  
  118.         }  
  119.       }  
  120.     };  
  121.   }  
  122.   /* 
  123.    * Data structure to hold a Writer and amount of data written on it.  
  124.    */  
  125.   static class WriterLength {  
  126.     long written = 0;  
  127.     HFile.Writer writer = null;  
  128.   }  
  129. }  


可以看到,它的工作流程就是首先根據你的配置檔案初始化,然後寫成hfile的格式。 
這裡我做了個偷懶的demo: 
Java程式碼  收藏程式碼
  1. HFileOutputFormat hf = new HFileOutputFormat();  
  2.         HBaseConfiguration conf = new HBaseConfiguration();  
  3.         conf.addResource(new Path("/home/performance/softs/hadoop/conf/core-site.xml"));  
  4.         conf.set("mapred.output.dir""/tmp");  
  5.         conf.set("hfile.compression", Compression.Algorithm.LZO.getName());  
  6.         TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());  
  7.         RecordWriter writer = hf.getRecordWriter(context);  
  8.         KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:action"),  
  9.                                    System.currentTimeMillis(), Bytes.toBytes("test"));  
  10.         KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:id"),  
  11.                                     System.currentTimeMillis(), Bytes.toBytes("123"));  
  12.         KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:action"),  
  13.                                     System.currentTimeMillis(), Bytes.toBytes("test"));  
  14.         KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:id"),  
  15.                                     System.currentTimeMillis(), Bytes.toBytes("123"));  
  16.         writer.write(null, kv);  
  17.         writer.write(null, kv1);  
  18.         writer.write(null, kv3);  
  19.         writer.write(null, kv4);  
  20.         writer.close(context);  

執行然之後,會在hdfs的/tmp目錄下生成一份檔案。注意批量寫資料的時候一定要保證key的有序性
這個時候,hbase自己提供的一個基於jruby的loadtable.rb指令碼就可以發揮作用了。 
它的格式是loadtable.rb 你希望的表明 hdfs路徑: 
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/ 
執行完之後: 
執行./hbase shell 
>list 
就會顯示剛才匯入的offer表了。