1. 程式人生 > >Hadoop RCFile儲存格式詳解(原始碼分析、程式碼示例)

Hadoop RCFile儲存格式詳解(原始碼分析、程式碼示例)

RCFile RCFile全稱Record Columnar File,列式記錄檔案,是一種類似於SequenceFile的鍵值對(Key/Value Pairs)資料檔案。 關鍵詞:Record、Columnar、Key、Value。 RCFile的優勢在哪裡?適用於什麼場景?為了讓大家有一個感性的認識,我們來看一個例子。 假設我們有這樣一張9行3列的Hive資料表table,以普通的TextFile進行儲存, 現在我們需要統計這張資料表的第二列(col2)值為“row5_col2”的出現次數,我們通常會這樣寫SQL: select count(*) from table where col2 = 'row5_col2' 這條Hive SQL轉換為相應的MapReduce程式執行時,雖然我們僅僅只需要查詢該表的第2列資料即可得出結果,但因為我們使用的是TextFile儲存格式,不得不讀取整張資料表的資料參與計算。雖然我們可以使用一些壓縮機制優化儲存,減少讀取的資料量,但效果通常不顯著,而且畢竟讀取了很多無用的資料(col1、col3)。 再來看一下RCFile會如何儲存這張資料表的資料?巨集觀上大致可以分為以下三步: (1)水平劃分;
經過水平劃分之後的各個資料塊稱之為Row Split或Record。 (2)垂直劃分; 每一個Row Split或Record再按照“列”進行垂直劃分。 (3)列式儲存; RCFile以Record為單位進行儲存。 Record儲存資料時,首先儲存該Record內第一列的全部資料、然後儲存該Record內第二列的全部資料、…、依次將各列資料儲存完畢,然後繼續下一個Record的儲存。 Record實際由Key、Value兩部分組成,其中Key儲存著Record的元資料,如列數、每列資料的長度、每列資料中各個列值的長度等;Value儲存著Record各列的資料。實際上Record Key相當於Record的索引,利用它可以輕鬆的實現Record內部讀取/過濾某些列的操作。 而且RCFile將“行式”儲存變為“列式”儲存,相似的資料以更高的可能性被聚集在一起,壓縮效果更好。 要想詳細掌握一個數據檔案的儲存格式,就必須知道資料是通過怎樣的方式被寫入的,讀取僅僅是寫入的反面而已。RCFile分別針對寫入和讀取提供了相應的Writer類和Reader類,本文僅僅討論Writer類的實現。 原始碼分析
通常而言,RCFile檔案的整個寫入過程大致可以分為三步: (1)構建RCFile.Writer例項——Writer(...) (2)通過RCFile.Writer例項寫入資料——append (3)關閉RCFile.Writer例項——close 我們也按照這三步來分析相應的原始碼。 1. Writer Writer在構建函式中大體做了以下三件事情: (1)初始化一些變數值; a. RECORD_INTERVAL:表示多少“行”資料形成一個Row Split(Record)和columnsBufferSize配合使用; b. columnNumber:表示當前RCFile檔案儲存著多少“列”的資料; c. Metadata
Metadata例項僅僅儲存一個屬性“hive.io.rcfile.column.number”,值為columnNumber,該例項會被序列化到RCFile檔案頭部; d. columnsBufferSize:快取數目(行數)上限閥值,超過這個數值就會將快取的資料(行)形成一個Row Split(Record); (2)構建一些資料結構; a. columnValuePlainLength:儲存著一個Row Split(Record)內部各列原始資料的大小; b. columnBuffers:儲存著一個Row Split(Record)內部各列原始資料; c. key:儲存著一個Row Split(Record)的元資料; d. plainTotalColumnLength:儲存著一個RCFile檔案內各列原始資料的大小; e. comprTotalColumnLength:儲存著一個RCFile檔案內各列原始資料被壓縮後的大小; (3)初始化檔案輸出流,並寫入檔案頭部資訊; a. 初始化RCFile檔案輸出流(FSDataOutputStream); useNewMagic預設值為true,本文也以此預設值進行討論。 b. initializeFileHeader; i. 寫出MAGIC; ii. 寫出當前RCFile版本號(不同版本的RCFile具有不同的格式); c. writeFileHeader; i. 寫出是否使用壓縮,本文按使用壓縮討論; ii. 寫出壓縮編/解碼器(CompressionCodec)類名; iii. 序列化Metadata例項; c. finalizeFileHeader; 寫出一個“同步標誌位”,表示RCFile檔案頭部資訊到此結束。 我們可以得出RCFile Header的結構如下:
version 3 bytes of magic header “RCF”, followed by 1 byte of actual version number
compression  A boolean which specifies if compression is turned on for keys/values in this file
compression codec CompressionCodec class which is used for compression of keys and/or values
metadata Metadata for this file
sync A sync marker to denote end of the header
2. append RCFile.Writer寫入資料時要求以BytesRefArrayWritable例項的形式進行“追加”,亦即一個BytesRefArrayWritable例項表示一“行”資料。 “追加”“行”資料的過程如下: (1)從一“行”資料(即BytesRefArrayWritable例項val)中解析出各“列”資料快取到對應的ColumnBuffer(即columnBuffers[i])中;如果這“行”資料包含的“列”小於columnNumber,則缺失的列會被填充為“空值”(即BytesRefWritable.ZeroBytesRefWritable); 我們可以看出,RCFile在“追加”資料的時候還是以“行”的方式進行,“行轉列”是在內部進行轉換的。轉換之後的列資料(列數為columnNumber)被快取到各自的“Buffer”中,也就是說每一列都有自己獨立的快取區(ColumnBuffer),這是為後來的“列式儲存”作準備的。 這裡重點介紹一下這個ColumnBuffer,它的作用就是用來快取“列資料”的, 內部包含兩個例項變數,如它們的變數名稱所言,它們實際也是用來快取資料的,columnValBuffer用來快取“列值”的資料,valLenBuffer用來快取“列值”各自的長度,這兩個內部的快取區都是NonSyncDataOutputBuffer例項。 從這三部分程式碼可以看出,NonSyncDataOutputBuffer內部的快取區實際是使用記憶體中的一個位元組陣列(buf)構建的,而且繼承自DataOutputStream,方便我們使用“流”的形式操作資料。 而且valLenBuffer在快取“列值”的長度的時候,為了有效的節約儲存空間,使用了一個技巧, 也就是說,如果需要儲存的“列值”長度為“1,1,1,2”,需要儲存四個整數,而且前面三個整數的值是一樣的,那麼我們將其變為“1,~2,2”,“~2”即表示我們需要將它前面的整數“1”重複兩次。如果資料的重複度較高,這種方式會節省大量的儲存空間。 (2)一“行”資料轉換為多“列”資料,並被快取到各自對應的快取區之後,需要進行兩個判斷: 快取的“列”資料(這裡指columnBuffers中的全部列資料)大小是否超過上限閥值columnsBufferSize? 快取的“行”記錄數目是否超過上限閥值RECORD_INTERVAL? 如果上述兩者條件滿足其一,我們認為已經快取足夠多的資料,可以將快取區的這些資料形成一個Row Split或Record,進行“溢寫”。 這兩個上限閥值(columnsBufferSize、RECORD_INTERVAL)也提示我們在實際應用中需要根據實際情況對這兩個值進行調整。 “溢寫”是通過flushRecords進行的,可以說是整個RCFile寫入過程中最為“複雜”的操作。 前面提到過,RCFile Record(Row Split)實際是由Key、Value組成的,現在這些“列”資料已經被快取到columnBuffers中,那麼Key的資料在哪裡呢? 這個Key實際上就是這個Row Split(Record)的元資料,也可以理解為Row Split(Record)的索引,它是由KeyBuffer表示的, columnNumber:列數; numberRows:RCFile Record(Row Split)內部儲存著多少“行”資料,同一個RCFile檔案,不同的Record內儲存的行數可能不同; RCFile Record Value實際就是前面提到的columnBuffers中的那些列值(可能經過壓縮處理),這些columnBuffers的元資料由以下三個變量表示: eachColumnValueLen:eachColumnValueLen[i]表示columnBuffers[i]中快取的列資料(原始資料)的總大小; eachColumnUncompressedValueLen:eachColumnUncompressedValueLen[i]表示columnBuffers[i]中的快取的列資料被壓縮之後的總大小;如果沒有經過壓縮處理,該值與columnBuffers[i]相同; allCellValLenBuffer:allCellValLenBuffer[i]表示columnBuffers[i]中那些列資料各自的長度(注意前方提到的這些長度的儲存技巧); KeyBuffer被序列化之後,它的結構如下:
numberRows Number_of_rows_in_this_record(vint)
columnValueLen Column_1_ondisk_compressed_length(vint)
columnUncompressedValueLen Column_1_ondisk_uncompressed_length(vint)
Column_1_row_1_value_plain_length
Column_1_row_2_value_plain_length
...
columnValueLen Column_2_ondisk_compressed_length(vint)
columnUncompressedValueLen Column_2_ondisk_uncompressed_length(vint)
Column_2_row_1_value_plain_length
Column_2_row_2_value_plain_length
...
為什麼說這樣的元資料可以當作索引來使用呢? 注意到上面的多個columnValueLen(columnUncompressedValueLen),它儲存著Record Value內多個列(簇)各自的總長度,而每個columnValueLen(columnUncompressedValueLen)後面儲存著該列(簇)內多個列值各自的長度。如果我們僅僅需要讀取第n列的資料,我們可以根據columnValueLen(columnUncompressedValueLen)直接跳過Record Value前面(n - 1)列的資料。 KeyBuffer的資料是在“溢寫”的過程中被構建的。下面我們來詳細分析flushRecords的具體邏輯。 key是KeyBuffer的例項,相當於在元資料中記錄這個Row Split(Record)的“行數”; 這段程式碼在使用壓縮的場景下才有意義,它構建了一個快取區valueBuffer,並且使用“裝飾器”模式構建了一個壓縮輸出流,用於後期將columnBuffers中的資料寫入快取區valueBuffer,valueBuffer中的資料是壓縮過的(後續會看到這個過程)。 接下來就是逐個處理columnBuffers中的資料,簡要來說,對於某個columnBuffers[i]而言需要做兩件事情: (1)如果使用壓縮,需要將columnBuffers[i]的資料通過壓縮輸出流deflateOut寫入valueBuffer中; (2)維護相關的幾個變數值; 這段程式碼看似較長,對於某個columnBuffers[i]而言,實際做的事情可以概括為四步: (1)如果使用壓縮,將columnBuffers[i]中的全部資料寫入deflateOut(實際是valueBuffer); (2)記錄columnBuffers[i]經過壓縮之後的長度colLen;如果沒有使用使用壓縮,則該值與原始資料長度相同; (3)記錄columnBuffers[i]相關元資料:columnBuffers[i]壓縮/未壓縮資料的長度、columnBuffers[i]中各個列值的長度; (4)維護plainTotalColumnLength、comprTotalColumnLength; 程式碼至此,一個Record(Row Split)的所有元資料已構建完畢;如果啟用壓縮,columnBuffers中的資料已全部被壓縮寫入valueBuffer;接下來就是Record Key、Value的“持久化”。 (1)Write the key out i. checkAndWriteSync 這裡需要先說一下為什麼需要這個“sync”? 比如我們有一個“大”的文字檔案,需要使用hadoop MapReduce進行分析。Hadoop MapReduce在提交Job之前會將這個大的文字檔案根據“切片”大小(假設為128M)進行“切片”,每一個MapTask處理這個檔案的一個“切片”(這裡不考慮處理多個切片的情況),也就是這個檔案的一部分資料。文字檔案是按行進行儲存的,那麼MapTask從某個“切片”的起始處讀取檔案資料時,如何定位一行記錄的起始位置呢?畢竟“切片”是按照位元組大小直接切分的,很有可能正好將某行記錄“切斷”。這時就需要有這樣的一個“sync”,相當於一個標誌位的作用,讓我們可以識別一行記錄的起始位置,對於文字檔案而言,這個“sync”就是換行符。所以,MapTask從某個“切片”的起始處讀取資料時,首先會“過濾”資料,直到遇到一個換行符,然後才開始讀取資料;如果讀取某行資料結束之後,發現“檔案遊標”超過該“切片”的範圍,則讀取結束。 RCFile同樣也需要這樣的一個“sync”,對於文字檔案而言,是每行文字一個“sync”;RCFile是以Record為單位進行儲存的,但是並沒有每個Record使用一個“sync”,而是兩個“sync”之間有一個間隔限制SYNC_INTERVAL, SYNC_INTERVAL = 100 * (4 + 16) 每次開始輸出下一個Record的資料之前,都會計算當前檔案的輸出位置相對於上個“sync”的偏移量,如果超過SYNC_INTERVAL就輸出一個“sync”。 那麼這個“sync”是什麼呢? 也就是說,RCFile的“sync”就是一個長度為16位元組的隨機位元組串,這裡不討論UID的生成過程。 ii. write total record length、key portion length iii. write keyLength、keyBuffer 注意這裡的keyLength與ii中的keyLength不同:ii中的keyLength相當於記錄的是keyBuffer原始資料的長度;而iii中的keyLength相當於記錄的是keyBuffer原始資料被壓縮之後的長度,如果沒有壓縮,該值與ii中的keyLength相同。 在這塊程式碼之前,還涉及到一個對keyBuffer的壓縮過程(如果啟用壓縮),它與ColumnBuffer的壓縮過程是類似的,不再贅述。 從上面的程式碼可以看出,在Record Key(KeyBuffer)之前,還存在這樣的一個結構,相當於Record Header:
recordLen Record length in bytes
keyLength Key length in bytes
compressedKeyLen Compressed Key length in bytes
(2)Write the value out 如果啟用壓縮,直接寫出valueBuffer中的壓縮資料即可;如果未啟用壓縮,需要將columnBuffers中的資料逐個寫出。 RCFile Record Value的結構實際上就是各個“列簇”的列值,如下:
column_1_row_1_value
column_1_row_2_value
...
column_2_row_1_value
column_2_row_1_value
...
程式碼至此,我們就完成了一個Row Split(Record)的輸出。 最後就是清空相關記錄,為下一個Row Split(Record)的快取輸出作準備, 3. close RCFile檔案的“關閉”操作大致可分為兩步: (1)如果快取區中仍有資料,呼叫flushRecords將資料“溢寫”出去; (2)關閉檔案輸出流。 程式碼示例 1. Write (1)構建Writer例項; 注意,一定要在Hadoop Configuration中通過屬性hive.io.rcfile.column.number.conf設定RCFile的“列數”。 (2)構建多行資料; 每行資料使用一個BytesRefArrayWritable例項表示。 (3)Writer append; (4)Writer close; 2. Read 讀取時需要注意,RCFileRecordReader的建構函式要求指定一個“切片”,如果我們需要讀取整個檔案的資料,就需要將整個檔案打造成為一個“切片”(如上);RCFileRecordReader例項構建好之後,就可以通過next()不斷迭代key、value,其中key為行數,value為行記錄。 程式碼輸出 如果我們僅僅需要讀取第1列和第3列的資料,應該怎麼做呢? 通過這樣的設定,我們可以得到如下的輸出結果: 可以注意到,雖然讀取的還是3列資料,但第2列的資料已經被返回“空值”。