1. 程式人生 > 其它 >基於 Hive 的檔案格式:RCFile 簡介及其應用

基於 Hive 的檔案格式:RCFile 簡介及其應用

Hadoop 作為MR 的開源實現,一直以動態執行解析檔案格式並獲得比MPP資料庫快上幾倍的裝載速度為優勢。不過,MPP資料庫社群也一直批評Hadoop由於檔案格式並非為特定目的而建,因此序列化和反序列化的成本過高。

1、hadoop 檔案格式簡介

目前 hadoop 中流行的檔案格式有如下幾種:

(1)SequenceFile

SequenceFile是Hadoop API 提供的一種二進位制檔案,它將資料以<key,value>的形式序列化到檔案中。這種二進位制檔案內部使用Hadoop 的標準的Writable 介面實現序列化和反序列化。它與Hadoop API中的MapFile 是互相相容的。Hive 中的SequenceFile 繼承自Hadoop API 的SequenceFile,不過它的key為空,使用value 存放實際的值, 這樣是為了避免MR 在執行map 階段的排序過程。如果你用Java API 編寫SequenceFile,並讓Hive 讀取的話,請確保使用value欄位存放資料,否則你需要自定義讀取這種SequenceFile 的InputFormat class 和OutputFormat class。

(2)RCFile

RCFile是Hive推出的一種專門面向列的資料格式。 它遵循“先按列劃分,再垂直劃分”的設計理念。當查詢過程中,針對它並不關心的列時,它會在IO上跳過這些列。需要說明的是,RCFile在map階段從遠端拷貝仍然是拷貝整個資料塊,並且拷貝到本地目錄後RCFile並不是真正直接跳過不需要的列,並跳到需要讀取的列, 而是通過掃描每一個row group的頭部定義來實現的,但是在整個HDFS Block 級別的頭部並沒有定義每個列從哪個row group起始到哪個row group結束。所以在讀取所有列的情況下,RCFile的效能反而沒有SequenceFile高。

HDFS塊內行儲存的例子

HDFS塊內列儲存的例子

HDFS塊內RCFile方式儲存的例子

(3)Avro

Avro是一種用於支援資料密集型的二進位制檔案格式。它的檔案格式更為緊湊,若要讀取大量資料時,Avro能夠提供更好的序列化和反序列化效能。並且Avro資料檔案天生是帶Schema定義的,所以它不需要開發者在API 級別實現自己的Writable物件。最近多個Hadoop 子專案都支援Avro 資料格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。

(4)文字格式

除上面提到的3種二進位制格式之外,文字格式的資料也是Hadoop中經常碰到的。如TextFile 、XML和JSON。 文字格式除了會佔用更多磁碟資源外,對它的解析開銷一般會比二進位制格式高几十倍以上,尤其是XML 和JSON,它們的解析開銷比Textfile 還要大,因此強烈不建議在生產系統中使用這些格式進行儲存。 如果需要輸出這些格式,請在客戶端做相應的轉換操作。 文字格式經常會用於日誌收集,資料庫匯入,Hive預設配置也是使用文字格式,而且常常容易忘了壓縮,所以請確保使用了正確的格式。另外文字格式的一個缺點是它不具備型別和模式,比如銷售金額、利潤這類數值資料或者日期時間型別的資料,如果使用文字格式儲存,由於它們本身的字串型別的長短不一,或者含有負數,導致MR沒有辦法排序,所以往往需要將它們預處理成含有模式的二進位制格式,這又導致了不必要的預處理步驟的開銷和儲存資源的浪費。

(5)外部格式

Hadoop實際上支援任意檔案格式,只要能夠實現對應的RecordWriter和RecordReader即可。其中資料庫格式也是會經常儲存在Hadoop中,比如Hbase,Mysql,Cassandra,MongoDB。 這些格式一般是為了避免大量的資料移動和快速裝載的需求而用的。他們的序列化和反序列化都是由這些資料庫格式的客戶端完成,並且檔案的儲存位置和資料佈局(Data Layout)不由Hadoop控制,他們的檔案切分也不是按HDFS的塊大小(blocksize)進行切割。

2、為什麼需要 RCFile

Facebook曾在2010 ICDE(IEEE International Conference on Data Engineering)會議上介紹了資料倉庫Hive。Hive儲存海量資料在Hadoop系統中,提供了一套類資料庫的資料儲存和處理機制。它採用類SQL語言對資料進行自動化管理和處理,經過語句解析和轉換,最終生成基於Hadoop的MapReduce任務,通過執行這些任務完成資料處理。下圖顯示了Hive資料倉庫的系統結構。 

Facebook在資料倉庫上遇到的儲存可擴充套件性的挑戰是獨一無二的。他們在基於Hive的資料倉庫中儲存了超過300PB的資料,並且以每日新增600TB的速度增長。去年這個資料倉庫所儲存的資料量增長了3倍。考慮到這個增長趨勢,儲存效率問題是facebook資料倉庫基礎設施方面目前乃至將來一段時間內最需要關注的。facebook工程師發表的RCFile: A Fast and Spaceefficient Data Placement Structure in MapReducebased Warehouse Systems一文,介紹了一種高效的資料儲存結構——RCFile(Record Columnar File),並將其應用於Facebook的資料倉庫Hive中。與傳統資料庫的資料儲存結構相比,RCFile更有效地滿足了基於MapReduce的資料倉庫的四個關鍵需求,即Fast data loading、Fast query processing、Highly efficient storage space utilization和Strong adaptivity to highly dynamic workload patterns。RCFile 廣泛應用於Facebook公司的資料分析系統Hive中。首先,RCFile具備相當於行儲存的資料載入速度和負載適應能力;其次,RCFile的讀優化可以在掃描表格時避免不必要的列讀取,測試顯示在多數情況下,它比其他結構擁有更好的效能;再次,RCFile使用列維度的壓縮,因此能夠有效提升儲存空間利用率。 為了提高儲存空間利用率,Facebook各產品線應用產生的資料從2010年起均採用RCFile結構儲存,按行儲存(SequenceFile/TextFile)結構儲存的資料集也轉存為RCFile格式。此外,Yahoo公司也在Pig資料分析系統中集成了RCFile,RCFile正在用於另一個基於Hadoop的資料管理系統Howl(http://wiki.apache.org/pig/Howl)。而且,根據Hive開發社群的交流,RCFile也成功整合加入其他基於MapReduce的資料分析平臺。有理由相信,作為資料儲存標準的RCFile,將繼續在MapReduce環境下的大規模資料分析中扮演重要角色。

3、RCFile 簡介

facebook 的資料倉庫中資料被載入到表裡面時首先使用的儲存格式是Facebook自己開發的Record-Columnar File Format(RCFile)。RCFile是一種“允許按行查詢,提供了列儲存的壓縮效率”的混合列儲存格式。它的核心思想是首先把Hive表水平切分成多個行組(row groups),然後組內按照列垂直切分,這樣列與列的資料在磁碟上就是連續的儲存塊了。 當一個行組內的所有列寫到磁碟時,RCFile就會以列為單位對資料使用類似zlib/lzo的演算法進行壓縮。當讀取列資料的時候使用惰性解壓策略( lazy decompression),也就是說使用者的某個查詢如果只是涉及到一個表中的部分列的時候,RCFile會跳過不需要的列的解壓縮和反序列化的過程。通過在facebook的資料倉庫中選取有代表性的例子實驗,RCFile能夠提供5倍的壓縮比。

4、超越RCFile,下一步採用什麼方法

隨著資料倉庫中儲存的資料量持續增長,FB組內的工程師開始研究提高壓縮效率的技術和方法。研究的焦點集中在列級別的編碼方法,例如行程長度編碼(run-length encoding)、詞典編碼(dictionary encoding)、參考幀編碼(frame of reference encoding)、能夠在通用壓縮過程之前更好的在列級別降低邏輯冗餘的數值編碼方法。FB也嘗試過新的列型別(例如JSON是在Facebook內部廣泛使用的格式,把JSON格式的資料按照結構化的方式儲存既可以滿足高效查詢的需求,同時也降低了JSON元資料儲存的冗餘)。FB的實驗表明列級別的編碼如果使用得當的話能夠顯著提高RCFile的壓縮比。 與此同時,Hortonworks也在嘗試類似的思路去改進Hive的儲存格式。Hortonworks的工程團隊設計和實現了ORCFile(包括儲存格式和讀寫介面),這幫助Facebook的資料倉庫設計和實現新的儲存格式提供了一個很好的開始。

關於 ORCFile 的介紹請見這裡:http://yanbohappy.sinaapp.com/?p=478

關於效能評測,筆者這裡暫時沒有條件,貼一張某次 hive 技術峰會演講嘉賓的截圖:

5、如何生成 RCFile 檔案

上面說了這麼多,想必你已經知道 RCFile 主要用於提升 hive 的查詢效率,那如何生成這種格式的檔案呢?

(1)hive 中直接通過textfile表進行insert轉換

例如:

insert overwrite table http_RCTable partition(dt='2013-09-30') select p_id,tm,idate,phone from tmp_testp where dt='2013-09-30';

(2)通過 mapreduce 生成

目前為止,mapreduce 並沒有提供內建 API 對 RCFile 進行支援,倒是 pig、hive、hcatalog 等 hadoop生態圈裡的其他專案進行了支援,究其原因是因為 RCFile 相比 textfile 等其它檔案格式,對於 mapreduce 的應用場景來說沒有顯著的優勢。

為了避免重複造輪子,下面的生成 RCFile 的 mapreduce 程式碼呼叫了 hive 和 hcatalog 的相關類,注意你在測試下面的程式碼時,你的 hadoop、hive、hcatalog 版本要一致,否則。。。你懂的。。。

比如我用的 hive-0.10.0+198-1.cdh4.4.0,那麼就應該下載對應的版本:http://archive.cloudera.com/cdh4/cdh/4/

PS:下面的程式碼已經測試通過,木有問題。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat;
import org.apache.hcatalog.rcfile.RCFileMapReduceOutputFormat;



public class TextToRCFile extends Configured implements Tool{

	
	
	public static class Map 
    	extends Mapper<Object, Text, NullWritable, BytesRefArrayWritable>{
		
		private byte[] fieldData;
		private int numCols;
		private BytesRefArrayWritable bytes;
		
		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			numCols = context.getConfiguration().getInt("hive.io.rcfile.column.number.conf", 0);
			bytes = new BytesRefArrayWritable(numCols);
		}
		
		public void map(Object key, Text line, Context context
                ) throws IOException, InterruptedException {
			bytes.clear();
			String[] cols = line.toString().split("\|");
			System.out.println("SIZE : "+cols.length);
			for (int i=0; i<numCols; i++){
	        	fieldData = cols[i].getBytes("UTF-8");
	        	BytesRefWritable cu = null;
	            cu = new BytesRefWritable(fieldData, 0, fieldData.length);
	            bytes.set(i, cu);
	        }
			context.write(NullWritable.get(), bytes);
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if(otherArgs.length < 2){
	    	System.out.println("Usage: " +
	    			"hadoop jar RCFileLoader.jar <main class> " +
	    			"-tableName <tableName> -numCols <numberOfColumns> -input <input path> " +
	    			"-output <output path> -rowGroupSize <rowGroupSize> -ioBufferSize <ioBufferSize>");
	    	System.out.println("For test");
	    	System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " +
	    			"-tableName test1 -numCols 10 -input RCFileLoaderTest/test1 " +
	    			"-output RCFileLoaderTest/RCFile_test1");
	    	System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " +
	    			"-tableName test2 -numCols 5 -input RCFileLoaderTest/test2 " +
	    			"-output RCFileLoaderTest/RCFile_test2");
	    	return 2;
	    }
		
		/* For test
		   
		 */
		
	    
		String tableName = "";
		int numCols = 0;
		String inputPath = "";
		String outputPath = "";
		int rowGroupSize = 16 *1024*1024;
		int ioBufferSize = 128*1024;
	    for (int i=0; i<otherArgs.length - 1; i++){
	    	if("-tableName".equals(otherArgs[i])){
	    		tableName = otherArgs[i+1];
	    	}else if ("-numCols".equals(otherArgs[i])){
	    		numCols = Integer.parseInt(otherArgs[i+1]);
	    	}else if ("-input".equals(otherArgs[i])){
	    		inputPath = otherArgs[i+1];
	    	}else if("-output".equals(otherArgs[i])){
	    		outputPath = otherArgs[i+1];
	    	}else if("-rowGroupSize".equals(otherArgs[i])){
	    		rowGroupSize = Integer.parseInt(otherArgs[i+1]);
	    	}else if("-ioBufferSize".equals(otherArgs[i])){
	    		ioBufferSize = Integer.parseInt(otherArgs[i+1]);
	    	}
	    	
	    }
	    
	    conf.setInt("hive.io.rcfile.record.buffer.size", rowGroupSize);
	    conf.setInt("io.file.buffer.size", ioBufferSize);
	    
	    Job job = new Job(conf, "RCFile loader: loading table " + tableName + " with " + numCols + " columns");
	    
	    job.setJarByClass(TextToRCFile.class);
	    job.setMapperClass(Map.class);
	    job.setMapOutputKeyClass(NullWritable.class);
	    job.setMapOutputValueClass(BytesRefArrayWritable.class);
//	    job.setNumReduceTasks(0);
	    
	    FileInputFormat.addInputPath(job, new Path(inputPath));
	    
	    job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
	    RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
	    RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath));
	    RCFileMapReduceOutputFormat.setCompressOutput(job, false);
	    
	    
	    System.out.println("Loading table " + tableName + " from " + inputPath + " to RCFile located at " + outputPath);
	    System.out.println("number of columns:" + job.getConfiguration().get("hive.io.rcfile.column.number.conf"));
	    System.out.println("RCFile row group size:" + job.getConfiguration().get("hive.io.rcfile.record.buffer.size"));
	    System.out.println("io bufer size:" + job.getConfiguration().get("io.file.buffer.size"));
	    
	    return (job.waitForCompletion(true) ? 0 : 1);
	}
	
	public static void main(String[] args) throws Exception {
	    int res = ToolRunner.run(new Configuration(), new TextToRCFile(), args);
	    System.exit(res);
	}

}

6、Refer:

(1)淺析Hadoop檔案格式 http://www.infoq.com/cn/articles/hadoop-file-format

(2)Facebook資料倉庫揭祕:RCFile高效儲存結構  http://www.csdn.net/article/2011-04-29/296900

(3)Facebook的資料倉庫是如何擴充套件到300PB的  http://yanbohappy.sinaapp.com/?p=478

(4)Hive架構  http://www.jdon.com/bigdata/hive.html

(5)Hive:ORC File Format儲存格式詳解  http://www.iteblog.com/archives/1014

(6)普通文字壓縮成RcFile的通用類  https://github.com/ysmart-xx/ysmart/blob/master/javatest/TextToRCFile.java

http://hugh-wangp.iteye.com/blog/1405804 基於HIVE檔案格式的map reduce程式碼編寫 http://smallboby.iteye.com/blog/1596776  普通文字壓縮成RcFile的通用類 http://smallboby.iteye.com/blog/1592531  RcFile儲存和讀取操作 https://github.com/kevinweil/elephant-bird/blob/master/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileOutputFormat.java

http://blog.csdn.net/liuzhoulong/article/details/7909863