hadoop SequenceFile——大資料 儲存
SequenceFile是一個由二進位制序列化過的key/value的位元組流組成的文字儲存檔案。
基於壓縮型別CompressType,共有三種SequenceFile
Writer:
public static enum CompressionType {
/** 不壓縮 */
NONE,
/** 只壓縮value */
RECORD,
/** 壓縮很多記錄的key/value成一塊 */
BLOCK
}
There are three SequenceFile
Writer
s based on the CompressType used to compress key/value pairs:
1、Writer
: Uncompressed records.
在SequenceFile裡有Writer的實現,原始碼如下:
public static class Writer implements java.io.Closeable, Syncable
大資料學習去群119599574
/** Write and flush the file header. */ private void writeFileHeader() throws IOException { out.write(VERSION); Text.writeString(out, keyClass.getName()); Text.writeString(out, valClass.getName()); out.writeBoolean(this.isCompressed()); out.writeBoolean(this.isBlockCompressed()); if (this.isCompressed()) { Text.writeString(out, (codec.getClass()).getName()); } this.metadata.write(out); out.write(sync); // write the sync bytes out.flush(); // flush header }
首先SequenceFile檔案有個表頭,以上是寫表頭的程式碼,我們以一個例項結合程式碼來看一下表頭的組成。
package demo; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://xxx.xxx.xxx.xx:9000"); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { String compressType = args[1]; System.out.println("compressType "+compressType); // Writer : Uncompressed records. if(compressType.equals("1") ){ System.out.println("compress none"); writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.NONE); }else if(compressType .equals("2") ){ System.out.println("compress record"); //RecordCompressWriter : Record-compressed files, only compress values. writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.RECORD); }else if(compressType.equals("3") ){ System.out.println("compress block"); // BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable. writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.BLOCK); } for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } }
存入的文字檔案內容如下:
key value
100 One, two, buckle my shoe
99 Three, four, shut the door
98 Five, six, pick up sticks
97 Seven, eight, lay them straight
96 Nine, ten, a big fat hen
95 One, two, buckle my shoe
94 Three, four, shut the door
93 Five, six, pick up sticks
92 Seven, eight, lay them straight
91 Nine, ten, a big fat hen
90 One, two, buckle my shoe
89 Three, four, shut the door
88 Five, six, pick up sticks
87 Seven, eight, lay them straight
86 Nine, ten, a big fat hen
85 One, two, buckle my shoe
84 Three, four, shut the door
83 Five, six, pick up sticks
82 Seven, eight, lay them straight
81 Nine, ten, a big fat hen
80 One, two, buckle my shoe
79 Three, four, shut the door
78 Five, six, pick up sticks
77 Seven, eight, lay them straight
76 Nine, ten, a big fat hen
75 One, two, buckle my shoe
74 Three, four, shut the door
73 Five, six, pick up sticks
72 Seven, eight, lay them straight
71 Nine, ten, a big fat hen
70 One, two, buckle my shoe
69 Three, four, shut the door
68 Five, six, pick up sticks
67 Seven, eight, lay them straight
66 Nine, ten, a big fat hen
65 One, two, buckle my shoe
64 Three, four, shut the door
63 Five, six, pick up sticks
62 Seven, eight, lay them straight
61 Nine, ten, a big fat hen
60 One, two, buckle my shoe
59 Three, four, shut the door
58 Five, six, pick up sticks
57 Seven, eight, lay them straight
56 Nine, ten, a big fat hen
55 One, two, buckle my shoe
54 Three, four, shut the door
53 Five, six, pick up sticks
52 Seven, eight, lay them straight
51 Nine, ten, a big fat hen
50 One, two, buckle my shoe
49 Three, four, shut the door
48 Five, six, pick up sticks
47 Seven, eight, lay them straight
46 Nine, ten, a big fat hen
45 One, two, buckle my shoe
44 Three, four, shut the door
43 Five, six, pick up sticks
42 Seven, eight, lay them straight
41 Nine, ten, a big fat hen
40 One, two, buckle my shoe
39 Three, four, shut the door
38 Five, six, pick up sticks
37 Seven, eight, lay them straight
36 Nine, ten, a big fat hen
35 One, two, buckle my shoe
34 Three, four, shut the door
33 Five, six, pick up sticks
32 Seven, eight, lay them straight
31 Nine, ten, a big fat hen
30 One, two, buckle my shoe
29 Three, four, shut the door
28 Five, six, pick up sticks
27 Seven, eight, lay them straight
26 Nine, ten, a big fat hen
25 One, two, buckle my shoe
24 Three, four, shut the door
23 Five, six, pick up sticks
22 Seven, eight, lay them straight
21 Nine, ten, a big fat hen
20 One, two, buckle my shoe
19 Three, four, shut the door
18 Five, six, pick up sticks
17 Seven, eight, lay them straight
16 Nine, ten, a big fat hen
15 One, two, buckle my shoe
14 Three, four, shut the door
13 Five, six, pick up sticks
12 Seven, eight, lay them straight
11 Nine, ten, a big fat hen
10 One, two, buckle my shoe
9 Three, four, shut the door
8 Five, six, pick up sticks
7 Seven, eight, lay them straight
6 Nine, ten, a big fat hen
5 One, two, buckle my shoe
4 Three, four, shut the door
3 Five, six, pick up sticks
2 Seven, eight, lay them straight
1 Nine, ten, a big fat hen
把以上java 打成jar,執行hadoop jar sfile.jar /test/numbers.seq 1 1代表不壓縮
hadoop fs -get hdfs:///test/numbers1.seq /usr/test/ 取出剛剛生成的檔案,我們用UE開啟看一下
0x53 0x45 0x51
這是SequenceFile Format的magic header「SEQ」,用來區別文字是否是「SequenceFile Format」。
0x06 版本編號,目前最新版為「SEQ6」
以上程式碼原始碼如下:
private static byte[] VERSION = new byte[] {
(byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
};
out.write(VERSION);
0x20 0x65........0x65
這部份是keyClassName(Key的類名),而第1個Byte(0x20)用來表示此字串的長度,此範例為org.apache.hadoop.io.IntWritable
以上程式碼原始碼如下:
Text.writeString(out, keyClass.getName());
0x19 0x6F ..... 0x74 這部份是valueClassName(Value的類名),第1個Byte(0x19)也是用來表示此字串的長度,此範例為org.apache.hadoop.io.BytesWritable
Text.writeString(out, valClass.getName());
大資料學習去群119599574
0x00 是否compression?「0x00」=否 (此為Boolean所以佔1個Byte)0x00 是否blockCompression?「0x00」=否(此為Boolean所以佔1個Byte)
out.writeBoolean(this.isCompressed());
out.writeBoolean(this.isBlockCompressed());
如果是壓縮的話接下去還會寫壓縮類,此範例沒有壓縮所以沒有此類名的寫入,原始碼如下:
if (this.isCompressed()) {
Text.writeString(out, (codec.getClass()).getName());
}
0x00 0x00 0x00 0x00 metadata,此範例沒有包含任何metadata, 所以輸出「0x00 0x00 0x00 0x00」
this.metadata.write(out);
0x76 0x61 ..... 0xAF sync標記,用來表示一個「Header」的結束。
byte[] sync; // 16 random bytes
{
try {
MessageDigest digester = MessageDigest.getInstance("MD5");
long time = Time.now();
digester.update((new UID()+"@"+time).getBytes());
sync = digester.digest();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
out.write(sync);
至此頭部檔案寫入完畢,可見頭部的格式可以歸納如下:
SequenceFile Header
-
version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
-
keyClassName -key class
-
valueClassName - value class
-
compression - A boolean which specifies if compression is turned on for keys/values in this file.
-
blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.
-
compression codec -
CompressionCodec
class which is used for compression of keys and/or values (if compression is enabled). -
metadata - for this file.
-
sync - A sync marker to denote end of the header.
接下去我們看一下資料儲存格式:
0x00 0x00 0x00 0x1D
整個Record的size ,一個Record包含「Key、Value」。此處為29個位元組,因為 key=100 佔4個位元組,value=One, two, buckle my shoe 佔24位元組 還有一個位元組存了value的長度 所以 29位元組總共。
0x00 0x00 0x00 0x04
Key內容的size~ (此為Int所以佔4個位元組)
0x00 0x00 0x00 0x64
Key的內容,此處為100,那十六進位制就是64
大資料學習去群119599574
0x18
value內容的size,此處是One, two, buckle my shoe,長度24 所以十六進位制就是18
0x4F 0X6E....0x65
value的內容One, two, buckle my shoe
以上程式碼原始碼如下:
public synchronized void append(Object key, Object val){
.......
// Write the record out
checkAndWriteSync(); // sync
out.writeInt(buffer.getLength()); // total record length
out.writeInt(keyLength); // key portion length
out.write(buffer.getData(), 0, buffer.getLength()); // data
}
當資料達到一個閥值,會寫sync,寫sync就是呼叫checkAndWriteSync(); 原始碼如下:
synchronized void checkAndWriteSync() throws IOException {
if (sync != null &&
out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
sync();
}
}
SYNC_INTERVAL定義如下:
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
/** The number of bytes between sync points.*/
public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
可見每2000byte會寫一個sync
SequeceFile 無壓縮圖示如下:
可見格式如下:
Uncompressed SequenceFile Format
-
Record
-
Record length
-
Key length
-
Key
-
Value
-
接下去我們看一下RecordCompressWriter
2、RecordCompressWriter
: Record-compressed files, only compress values.
執行hadoop jar sfile.jar /test/numbers2.seq 2 2代表使用RecordCompressWriter壓縮
同樣我們用UE看一下生成的檔案:
可見表頭和不壓縮的基本一致,有些小區別如下:
0x01
代表使用了壓縮
0x2A 0x6F ....0x63
使用的壓縮類org.apache.hadoop.io.compress.DefaultCodec
0x00 ...0x25
整個Record的size ,為37,為啥比不壓縮佔用的位元組更多?
0x00 0x00 0x00 0x04
Key內容的size~ (此為Int所以佔4個位元組)
0x00 0x00 0x00 0x64
Key的內容,此處為100,那十六進位制就是64 可見key沒有壓縮
以上原始碼如下:
@Override
@SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
if (val.getClass() != valClass)
throw new IOException("wrong value class: "+val.getClass().getName()
+" is not "+valClass);
buffer.reset();
// Append the 'key'
keySerializer.serialize(key);
int keyLength = buffer.getLength();
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
// Compress 'value' and append it
deflateFilter.resetState();
compressedValSerializer.serialize(val);
deflateOut.flush();
deflateFilter.finish();
// Write the record out
checkAndWriteSync(); // sync
out.writeInt(buffer.getLength()); // total record length
out.writeInt(keyLength); // key portion length
out.write(buffer.getData(), 0, buffer.getLength()); // data
}
SequeceFile 壓縮value圖示如下:
可見格式如下:
Record-Compressed SequenceFile Format
-
Record
-
Record length
-
Key length
-
Key
-
Compressed Value
-
接下去我們看一下BlockCompressWriter
3、BlockCompressWriter
: Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.
執行hadoop jar sfile.jar /test/numbers3.seq 3 3代表使用BlockCompressWriter壓縮
同樣我們用UE看一下生成的檔案:
可見表頭和不壓縮的基本一致,有些小區別如下:
0x01
代表使用了壓縮
0x01
代表使用了block壓縮
對於block壓縮的原始碼如下:
public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key+" is not "+keyClass);
if (val.getClass() != valClass)
throw new IOException("wrong value class: "+val+" is not "+valClass);
// Save key/value into respective buffers
int oldKeyLength = keyBuffer.getLength();
keySerializer.serialize(key);
int keyLength = keyBuffer.getLength() - oldKeyLength;
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
WritableUtils.writeVInt(keyLenBuffer, keyLength);
int oldValLength = valBuffer.getLength();
uncompressedValSerializer.serialize(val);
int valLength = valBuffer.getLength() - oldValLength;
WritableUtils.writeVInt(valLenBuffer, valLength);
// Added another key/value pair
++noBufferedRecords;
// Compress and flush?
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
sync();
}
}
其中,可見沒呼叫一次append,就會相應累加keyLenBuffer和valLenBuffer的長度
WritableUtils.writeVInt(keyLenBuffer, keyLength);
WritableUtils.writeVInt(valLenBuffer, valLength);
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
sync();
}
compressionBlockSize = conf.getInt("io.seqfile.compress.blocksize", 1000000);
當超過compressionBlockSize是會呼叫sync,我們來看看sync的原始碼,如下:
public synchronized void sync() throws IOException {
if (noBufferedRecords > 0) {
super.sync();
// No. of records
WritableUtils.writeVInt(out, noBufferedRecords);
// Write 'keys' and lengths
writeBuffer(keyLenBuffer);
writeBuffer(keyBuffer);
// Write 'values' and lengths
writeBuffer(valLenBuffer);
writeBuffer(valBuffer);
// Flush the file-stream
out.flush();
// Reset internal states
keyLenBuffer.reset();
keyBuffer.reset();
valLenBuffer.reset();
valBuffer.reset();
noBufferedRecords = 0;
}
Sequenc File ,塊壓縮圖示如下:
可見格式如下:
Block-Compressed SequenceFile Format
-
Record Block
-
Uncompressed number of records in the block
-
Compressed key-lengths block-size
-
Compressed key-lengths block
-
Compressed keys block-size
-
Compressed keys block
-
Compressed value-lengths block-size
-
Compressed value-lengths block
-
Compressed values block-size
-
Compressed values block
-
-
A sync-marker every block.