[hadoop2.7.1]I/O之“泥坯塊”SequenceFile前序知識
阿新 • • 發佈:2019-01-12
可以認為一個檔案是由儲存在DataNode上的資料塊組成的,客戶端並行從不同的DataNode中獲取一個檔案的資料塊,然後聯結這些資料塊,拼成完整的檔案。
第一步:client端傳送寫檔案請求,namenode檢查檔案是否存在,如果已存在,直接返回錯誤資訊,否則,傳送給client一些可用datanode節點
第二步:client將檔案分塊,並行儲存到不同節點上datanode上,在這裡要注意,得到datanode的列表以後,從namenode返回該列表到DFSClient之前,會在namenode端首先根據該寫入客戶端跟 datanode列表中每個datanode之間的“距離”由近到遠進行一個排序。如果此時DFS寫入端不是datanode,則選擇datanode列表中的第一個排在第一位。客戶端根據這個順序由近到遠的進行資料塊的寫入。傳送完成後,client同時傳送資訊給namenode和datanode。hadoop有自己的一套距離計算方法。
第三步: namenode收到的client資訊後,傳送確信資訊給datanode
第四步: datanode同時收到namenode和datanode的確認資訊後,提交寫操作。。
由此來看,namenode在選擇資料塊的寫入datanode列表時,就充分考慮到了將block副本分散在不同機架下,並同時儘量的避免了之前描述的過多的網路開銷。
在這種高效能並行處理機制下,SequenceFile起著無比重要的“合併”小檔案的作用,SequenceFile類中定義了 ,
為此SequenceFile定義了一個靜態列舉來標識:
由此分別定義了3個類Writer類,
只針對每條記錄的value值進行壓縮。
key/value分別壓縮成一塊(block),block大小可設定。
寫:
第一步:client端傳送寫檔案請求,namenode檢查檔案是否存在,如果已存在,直接返回錯誤資訊,否則,傳送給client一些可用datanode節點
第二步:client將檔案分塊,並行儲存到不同節點上datanode上,在這裡要注意,得到datanode的列表以後,從namenode返回該列表到DFSClient之前,會在namenode端首先根據該寫入客戶端跟 datanode列表中每個datanode之間的“距離”由近到遠進行一個排序。如果此時DFS寫入端不是datanode,則選擇datanode列表中的第一個排在第一位。客戶端根據這個順序由近到遠的進行資料塊的寫入。傳送完成後,client同時傳送資訊給namenode和datanode。hadoop有自己的一套距離計算方法。
第三步: namenode收到的client資訊後,傳送確信資訊給datanode
第四步: datanode同時收到namenode和datanode的確認資訊後,提交寫操作。。
由此來看,namenode在選擇資料塊的寫入datanode列表時,就充分考慮到了將block副本分散在不同機架下,並同時儘量的避免了之前描述的過多的網路開銷。
在這種高效能並行處理機制下,SequenceFile起著無比重要的“合併”小檔案的作用,SequenceFile類中定義了
SequenceFile.Writer
SequenceFile.Reader
和SequenceFile.Sorter類分別進行讀、寫、排序操作。
有三種SequenceFile Writers來壓縮SequenceFile的key/value:為此SequenceFile定義了一個靜態列舉來標識:
/** * The compression type used to compress key/value pairs in the * {@link SequenceFile}. * * @see SequenceFile.Writer */ public static enum CompressionType { /** Do not compress records. */ NONE, /** Compress values only, each separately. */ RECORD, /** Compress sequences of records together in blocks. */ BLOCK }
由此分別定義了3個類Writer類,RecordCompressWriter
類,BlockCompressWriter
類:
Writer類 : 記錄未進行壓縮:
/** Write key/value pairs to a sequence-format file. */ public static class Writer implements java.io.Closeable, Syncable { private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; DataOutputBuffer buffer = new DataOutputBuffer(); Class keyClass; Class valClass; private final CompressionType compress; CompressionCodec codec = null; CompressionOutputStream deflateFilter = null; DataOutputStream deflateOut = null; Metadata metadata = null; Compressor compressor = null; protected Serializer keySerializer; protected Serializer uncompressedValSerializer; protected Serializer compressedValSerializer; // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record // starts and ends by scanning for this value. long lastSyncPos; // position of last sync byte[] sync; // 16 random bytes { try { MessageDigest digester = MessageDigest.getInstance("MD5"); long time = Time.now(); digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8)); sync = digester.digest(); } catch (Exception e) { throw new RuntimeException(e); } } public static interface Option {} static class FileOption extends Options.PathOption implements Option { FileOption(Path path) { super(path); } } /** * @deprecated only used for backwards-compatibility in the createWriter methods * that take FileSystem. */ @Deprecated private static class FileSystemOption implements Option { private final FileSystem value; protected FileSystemOption(FileSystem value) { this.value = value; } public FileSystem getValue() { return value; } } static class StreamOption extends Options.FSDataOutputStreamOption implements Option { StreamOption(FSDataOutputStream stream) { super(stream); } } static class BufferSizeOption extends Options.IntegerOption implements Option { BufferSizeOption(int value) { super(value); } } static class BlockSizeOption extends Options.LongOption implements Option { BlockSizeOption(long value) { super(value); } } static class ReplicationOption extends Options.IntegerOption implements Option { ReplicationOption(int value) { super(value); } } static class KeyClassOption extends Options.ClassOption implements Option { KeyClassOption(Class<?> value) { super(value); } } static class ValueClassOption extends Options.ClassOption implements Option { ValueClassOption(Class<?> value) { super(value); } } static class MetadataOption implements Option { private final Metadata value; MetadataOption(Metadata value) { this.value = value; } Metadata getValue() { return value; } } static class ProgressableOption extends Options.ProgressableOption implements Option { ProgressableOption(Progressable value) { super(value); } } private static class CompressionOption implements Option { private final CompressionType value; private final CompressionCodec codec; CompressionOption(CompressionType value) { this(value, null); } CompressionOption(CompressionType value, CompressionCodec codec) { this.value = value; this.codec = (CompressionType.NONE != value && null == codec) ? new DefaultCodec() : codec; } CompressionType getValue() { return value; } CompressionCodec getCodec() { return codec; } } public static Option file(Path value) { return new FileOption(value); } /** * @deprecated only used for backwards-compatibility in the createWriter methods * that take FileSystem. */ @Deprecated private static Option filesystem(FileSystem fs) { return new SequenceFile.Writer.FileSystemOption(fs); } public static Option bufferSize(int value) { return new BufferSizeOption(value); } public static Option stream(FSDataOutputStream value) { return new StreamOption(value); } public static Option replication(short value) { return new ReplicationOption(value); } public static Option blockSize(long value) { return new BlockSizeOption(value); } public static Option progressable(Progressable value) { return new ProgressableOption(value); } public static Option keyClass(Class<?> value) { return new KeyClassOption(value); } public static Option valueClass(Class<?> value) { return new ValueClassOption(value); } public static Option metadata(Metadata value) { return new MetadataOption(value); } public static Option compression(CompressionType value) { return new CompressionOption(value); } public static Option compression(CompressionType value, CompressionCodec codec) { return new CompressionOption(value, codec); } /** * Construct a uncompressed writer from a set of options. * @param conf the configuration to use * @param options the options used when creating the writer * @throws IOException if it fails */ Writer(Configuration conf, Option... opts) throws IOException { BlockSizeOption blockSizeOption = Options.getOption(BlockSizeOption.class, opts); BufferSizeOption bufferSizeOption = Options.getOption(BufferSizeOption.class, opts); ReplicationOption replicationOption = Options.getOption(ReplicationOption.class, opts); ProgressableOption progressOption = Options.getOption(ProgressableOption.class, opts); FileOption fileOption = Options.getOption(FileOption.class, opts); FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); StreamOption streamOption = Options.getOption(StreamOption.class, opts); KeyClassOption keyClassOption = Options.getOption(KeyClassOption.class, opts); ValueClassOption valueClassOption = Options.getOption(ValueClassOption.class, opts); MetadataOption metadataOption = Options.getOption(MetadataOption.class, opts); CompressionOption compressionTypeOption = Options.getOption(CompressionOption.class, opts); // check consistency of options if ((fileOption == null) == (streamOption == null)) { throw new IllegalArgumentException("file or stream must be specified"); } if (fileOption == null && (blockSizeOption != null || bufferSizeOption != null || replicationOption != null || progressOption != null)) { throw new IllegalArgumentException("file modifier options not " + "compatible with stream"); } FSDataOutputStream out; boolean ownStream = fileOption != null; if (ownStream) { Path p = fileOption.getValue(); FileSystem fs; if (fsOption != null) { fs = fsOption.getValue(); } else { fs = p.getFileSystem(conf); } int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : bufferSizeOption.getValue(); short replication = replicationOption == null ? fs.getDefaultReplication(p) : (short) replicationOption.getValue(); long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : blockSizeOption.getValue(); Progressable progress = progressOption == null ? null : progressOption.getValue(); out = fs.create(p, true, bufferSize, replication, blockSize, progress); } else { out = streamOption.getValue(); } Class<?> keyClass = keyClassOption == null ? Object.class : keyClassOption.getValue(); Class<?> valueClass = valueClassOption == null ? Object.class : valueClassOption.getValue(); Metadata metadata = metadataOption == null ? new Metadata() : metadataOption.getValue(); this.compress = compressionTypeOption.getValue(); final CompressionCodec codec = compressionTypeOption.getCodec(); if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) { throw new IllegalArgumentException("SequenceFile doesn't work with " + "GzipCodec without native-hadoop " + "code!"); } init(conf, out, ownStream, keyClass, valueClass, codec, metadata); } /** Create the named file. * @deprecated Use * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} * instead. */ @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name), true, keyClass, valClass, null, new Metadata()); } /** Create the named file with write-progress reporter. * @deprecated Use * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} * instead. */ @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, Progressable progress, Metadata metadata) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name, progress), true, keyClass, valClass, null, metadata); } /** Create the named file with write-progress reporter. * @deprecated Use * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} * instead. */ @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize, Progressable progress, Metadata metadata) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name, true, bufferSize, replication, blockSize, progress), true, keyClass, valClass, null, metadata); } boolean isCompressed() { return compress != CompressionType.NONE; } boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } Writer ownStream() { this.ownOutputStream = true; return this; } /** Write and flush the file header. */ private void writeFileHeader() throws IOException { out.write(VERSION); Text.writeString(out, keyClass.getName()); Text.writeString(out, valClass.getName()); out.writeBoolean(this.isCompressed()); out.writeBoolean(this.isBlockCompressed()); if (this.isCompressed()) { Text.writeString(out, (codec.getClass()).getName()); } this.metadata.write(out); out.write(sync); // write the sync bytes out.flush(); // flush header } /** Initialize. */ @SuppressWarnings("unchecked") void init(Configuration conf, FSDataOutputStream out, boolean ownStream, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) throws IOException { this.conf = conf; this.out = out; this.ownOutputStream = ownStream; this.keyClass = keyClass; this.valClass = valClass; this.codec = codec; this.metadata = metadata; SerializationFactory serializationFactory = new SerializationFactory(conf); this.keySerializer = serializationFactory.getSerializer(keyClass); if (this.keySerializer == null) { throw new IOException( "Could not find a serializer for the Key class: '" + keyClass.getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using" + "custom serialization."); } this.keySerializer.open(buffer); this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); if (this.uncompressedValSerializer == null) { throw new IOException( "Could not find a serializer for the Value class: '" + valClass.getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using" + "custom serialization."); } this.uncompressedValSerializer.open(buffer); if (this.codec != null) { ReflectionUtils.setConf(this.codec, this.conf); this.compressor = CodecPool.getCompressor(this.codec); this.deflateFilter = this.codec.createOutputStream(buffer, compressor); this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); this.compressedValSerializer = serializationFactory.getSerializer(valClass); if (this.compressedValSerializer == null) { throw new IOException( "Could not find a serializer for the Value class: '" + valClass.getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using" + "custom serialization."); } this.compressedValSerializer.open(deflateOut); } writeFileHeader(); } /** Returns the class of keys in this file. */ public Class getKeyClass() { return keyClass; } /** Returns the class of values in this file. */ public Class getValueClass() { return valClass; } /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } /** create a sync point */ public void sync() throws IOException { if (sync != null && lastSyncPos != out.getPos()) { out.writeInt(SYNC_ESCAPE); // mark the start of the sync out.write(sync); // write sync lastSyncPos = out.getPos(); // update lastSyncPos } } /** * flush all currently written data to the file system * @deprecated Use {@link #hsync()} or {@link #hflush()} instead */ @Deprecated public void syncFs() throws IOException { if (out != null) { out.sync(); // flush contents to file system } } @Override public void hsync() throws IOException { if (out != null) { out.hsync(); } } @Override public void hflush() throws IOException { if (out != null) { out.hflush(); } } /** Returns the configuration of this file. */ Configuration getConf() { return conf; } /** Close the file. */ @Override public synchronized void close() throws IOException { keySerializer.close(); uncompressedValSerializer.close(); if (compressedValSerializer != null) { compressedValSerializer.close(); } CodecPool.returnCompressor(compressor); compressor = null; if (out != null) { // Close the underlying stream iff we own it... if (ownOutputStream) { out.close(); } else { out.flush(); } out = null; } } synchronized void checkAndWriteSync() throws IOException { if (sync != null && out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync sync(); } } /** Append a key/value pair. */ public void append(Writable key, Writable val) throws IOException { append((Object) key, (Object) val); } /** Append a key/value pair. */ @SuppressWarnings("unchecked") public synchronized void append(Object key, Object val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass); buffer.reset(); // Append the 'key' keySerializer.serialize(key); int keyLength = buffer.getLength(); if (keyLength < 0) throw new IOException("negative length keys not allowed: " + key); // Append the 'value' if (compress == CompressionType.RECORD) { deflateFilter.resetState(); compressedValSerializer.serialize(val); deflateOut.flush(); deflateFilter.finish(); } else { uncompressedValSerializer.serialize(val); } // Write the record out checkAndWriteSync(); // sync out.writeInt(buffer.getLength()); // total record length out.writeInt(keyLength); // key portion length out.write(buffer.getData(), 0, buffer.getLength()); // data } public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException { if (keyLength < 0) throw new IOException("negative length keys not allowed: " + keyLength); int valLength = val.getSize(); checkAndWriteSync(); out.writeInt(keyLength+valLength); // total record length out.writeInt(keyLength); // key portion length out.write(keyData, keyOffset, keyLength); // key val.writeUncompressedBytes(out); // value } /** Returns the current length of the output file. * * <p>This always returns a synchronized position. In other words, * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However * the key may be earlier in the file than key last written when this * method was called (e.g., with block-compression, it may be the first key * in the block that was being written when this method was called). */ public synchronized long getLength() throws IOException { return out.getPos(); } } // class Writer /** Write key/compressed-value pairs to a sequence-format file. */ static class RecordCompressWriter extends Writer { RecordCompressWriter(Configuration conf, Option... options) throws IOException { super(conf, options); } /** Append a key/value pair. */ @Override @SuppressWarnings("unchecked") public synchronized void append(Object key, Object val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass); buffer.reset(); // Append the 'key' keySerializer.serialize(key); int keyLength = buffer.getLength(); if (keyLength < 0) throw new IOException("negative length keys not allowed: " + key); // Compress 'value' and append it deflateFilter.resetState(); compressedValSerializer.serialize(val); deflateOut.flush(); deflateFilter.finish(); // Write the record out checkAndWriteSync(); // sync out.writeInt(buffer.getLength()); // total record length out.writeInt(keyLength); // key portion length out.write(buffer.getData(), 0, buffer.getLength()); // data } /** Append a key/value pair. */ @Override public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException { if (keyLength < 0) throw new IOException("negative length keys not allowed: " + keyLength); int valLength = val.getSize(); checkAndWriteSync(); // sync out.writeInt(keyLength+valLength); // total record length out.writeInt(keyLength); // key portion length out.write(keyData, keyOffset, keyLength); // 'key' data val.writeCompressedBytes(out); // 'value' data } }
RecordCompressWriter:
只針對每條記錄的value值進行壓縮。
/** Write compressed key/value blocks to a sequence-format file. */
static class BlockCompressWriter extends Writer {
private int noBufferedRecords = 0;
private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
private DataOutputBuffer keyBuffer = new DataOutputBuffer();
private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
private DataOutputBuffer valBuffer = new DataOutputBuffer();
private final int compressionBlockSize;
BlockCompressWriter(Configuration conf,
Option... options) throws IOException {
super(conf, options);
compressionBlockSize =
conf.getInt("io.seqfile.compress.blocksize", 1000000);
keySerializer.close();
keySerializer.open(keyBuffer);
uncompressedValSerializer.close();
uncompressedValSerializer.open(valBuffer);
}
/** Workhorse to check and write out compressed data/lengths */
private synchronized
void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
throws IOException {
deflateFilter.resetState();
buffer.reset();
deflateOut.write(uncompressedDataBuffer.getData(), 0,
uncompressedDataBuffer.getLength());
deflateOut.flush();
deflateFilter.finish();
WritableUtils.writeVInt(out, buffer.getLength());
out.write(buffer.getData(), 0, buffer.getLength());
}
/** Compress and flush contents to dfs */
@Override
public synchronized void sync() throws IOException {
if (noBufferedRecords > 0) {
super.sync();
// No. of records
WritableUtils.writeVInt(out, noBufferedRecords);
// Write 'keys' and lengths
writeBuffer(keyLenBuffer);
writeBuffer(keyBuffer);
// Write 'values' and lengths
writeBuffer(valLenBuffer);
writeBuffer(valBuffer);
// Flush the file-stream
out.flush();
// Reset internal states
keyLenBuffer.reset();
keyBuffer.reset();
valLenBuffer.reset();
valBuffer.reset();
noBufferedRecords = 0;
}
}
/** Close the file. */
@Override
public synchronized void close() throws IOException {
if (out != null) {
sync();
}
super.close();
}
/** Append a key/value pair. */
@Override
@SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key+" is not "+keyClass);
if (val.getClass() != valClass)
throw new IOException("wrong value class: "+val+" is not "+valClass);
// Save key/value into respective buffers
int oldKeyLength = keyBuffer.getLength();
keySerializer.serialize(key);
int keyLength = keyBuffer.getLength() - oldKeyLength;
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
WritableUtils.writeVInt(keyLenBuffer, keyLength);
int oldValLength = valBuffer.getLength();
uncompressedValSerializer.serialize(val);
int valLength = valBuffer.getLength() - oldValLength;
WritableUtils.writeVInt(valLenBuffer, valLength);
// Added another key/value pair
++noBufferedRecords;
// Compress and flush?
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
sync();
}
}
/** Append a key/value pair. */
@Override
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
if (keyLength < 0)
throw new IOException("negative length keys not allowed");
int valLength = val.getSize();
// Save key/value data in relevant buffers
WritableUtils.writeVInt(keyLenBuffer, keyLength);
keyBuffer.write(keyData, keyOffset, keyLength);
WritableUtils.writeVInt(valLenBuffer, valLength);
val.writeUncompressedBytes(valBuffer);
// Added another key/value pair
++noBufferedRecords;
// Compress and flush?
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
sync();
}
}
}
BlockCompressWriter:
key/value分別壓縮成一塊(block),block大小可設定。
/** Get the configured buffer size */
private static int getBufferSize(Configuration conf) {
return conf.getInt("io.file.buffer.size", 4096);
}
/** Reads key/value pairs from a sequence-format file. */
public static class Reader implements java.io.Closeable {
private String filename;
private FSDataInputStream in;
private DataOutputBuffer outBuf = new DataOutputBuffer();
private byte version;
private String keyClassName;
private String valClassName;
private Class keyClass;
private Class valClass;
private CompressionCodec codec = null;
private Metadata metadata = null;
private byte[] sync = new byte[SYNC_HASH_SIZE];
private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
private boolean syncSeen;
private long headerEnd;
private long end;
private int keyLength;
private int recordLength;
private boolean decompress;
private boolean blockCompressed;
private Configuration conf;
private int noBufferedRecords = 0;
private boolean lazyDecompress = true;
private boolean valuesDecompressed = true;
private int noBufferedKeys = 0;
private int noBufferedValues = 0;
private DataInputBuffer keyLenBuffer = null;
private CompressionInputStream keyLenInFilter = null;
private DataInputStream keyLenIn = null;
private Decompressor keyLenDecompressor = null;
private DataInputBuffer keyBuffer = null;
private CompressionInputStream keyInFilter = null;
private DataInputStream keyIn = null;
private Decompressor keyDecompressor = null;
private DataInputBuffer valLenBuffer = null;
private CompressionInputStream valLenInFilter = null;
private DataInputStream valLenIn = null;
private Decompressor valLenDecompressor = null;
private DataInputBuffer valBuffer = null;
private CompressionInputStream valInFilter = null;
private DataInputStream valIn = null;
private Decompressor valDecompressor = null;
private Deserializer keyDeserializer;
private Deserializer valDeserializer;
/**
* A tag interface for all of the Reader options
*/
public static interface Option {}
/**
* Create an option to specify the path name of the sequence file.
* @param value the path to read
* @return a new option
*/
public static Option file(Path value) {
return new FileOption(value);
}
/**
* Create an option to specify the stream with the sequence file.
* @param value the stream to read.
* @return a new option
*/
public static Option stream(FSDataInputStream value) {
return new InputStreamOption(value);
}
/**
* Create an option to specify the starting byte to read.
* @param value the number of bytes to skip over
* @return a new option
*/
public static Option start(long value) {
return new StartOption(value);
}
/**
* Create an option to specify the number of bytes to read.
* @param value the number of bytes to read
* @return a new option
*/
public static Option length(long value) {
return new LengthOption(value);
}
/**
* Create an option with the buffer size for reading the given pathname.
* @param value the number of bytes to buffer
* @return a new option
*/
public static Option bufferSize(int value) {
return new BufferSizeOption(value);
}
private static class FileOption extends Options.PathOption
implements Option {
private FileOption(Path value) {
super(value);
}
}
private static class InputStreamOption
extends Options.FSDataInputStreamOption
implements Option {
private InputStreamOption(FSDataInputStream value) {
super(value);
}
}
private static class StartOption extends Options.LongOption
implements Option {
private StartOption(long value) {
super(value);
}
}
private static class LengthOption extends Options.LongOption
implements Option {
private LengthOption(long value) {
super(value);
}
}
private static class BufferSizeOption extends Options.IntegerOption
implements Option {
private BufferSizeOption(int value) {
super(value);
}
}
// only used directly
private static class OnlyHeaderOption extends Options.BooleanOption
implements Option {
private OnlyHeaderOption() {
super(true);
}
}
public Reader(Configuration conf, Option... opts) throws IOException {
// Look up the options, these are null if not set
FileOption fileOpt = Options.getOption(FileOption.class, opts);
InputStreamOption streamOpt =
Options.getOption(InputStreamOption.class, opts);
StartOption startOpt = Options.getOption(StartOption.class, opts);
LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
OnlyHeaderOption headerOnly =
Options.getOption(OnlyHeaderOption.class, opts);
// check for consistency
if ((fileOpt == null) == (streamOpt == null)) {
throw new
IllegalArgumentException("File or stream option must be specified");
}
if (fileOpt == null && bufOpt != null) {
throw new IllegalArgumentException("buffer size can only be set when" +
" a file is specified.");
}
// figure out the real values
Path filename = null;
FSDataInputStream file;
final long len;
if (fileOpt != null) {
filename = fileOpt.getValue();
FileSystem fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
initialize(filename, file, start, len, conf, headerOnly != null);
}
/**
* Construct a reader by opening a file from the given file system.
* @param fs The file system used to open the file.
* @param file The file being read.
* @param conf Configuration
* @throws IOException
* @deprecated Use Reader(Configuration, Option...) instead.
*/
@Deprecated
public Reader(FileSystem fs, Path file,
Configuration conf) throws IOException {
this(conf, file(file.makeQualified(fs)));
}
/**
* Construct a reader by the given input stream.
* @param in An input stream.
* @param buffersize unused
* @param start The starting position.
* @param length The length being read.
* @param conf Configuration
* @throws IOException
* @deprecated Use Reader(Configuration, Reader.Option...) instead.
*/
@Deprecated
public Reader(FSDataInputStream in, int buffersize,
long start, long length, Configuration conf) throws IOException {
this(conf, stream(in), start(start), length(length));
}
/** Common work of the constructors. */
private void initialize(Path filename, FSDataInputStream in,
long start, long length, Configuration conf,
boolean tempReader) throws IOException {
if (in == null) {
throw new IllegalArgumentException("in == null");
}
this.filename = filename == null ? "<unknown>" : filename.toString();
this.in = in;
this.conf = conf;
boolean succeeded = false;
try {
seek(start);
this.end = this.in.getPos() + length;
// if it wrapped around, use the max
if (end < length) {
end = Long.MAX_VALUE;
}
init(tempReader);
succeeded = true;
} finally {
if (!succeeded) {
IOUtils.cleanup(LOG, this.in);
}
}
}
/**
* Override this method to specialize the type of
* {@link FSDataInputStream} returned.
* @param fs The file system used to open the file.
* @param file The file being read.
* @param bufferSize The buffer size used to read the file.
* @param length The length being read if it is >= 0. Otherwise,
* the length is not available.
* @return The opened stream.
* @throws IOException
*/
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length) throws IOException {
return fs.open(file, bufferSize);
}
/**
* Initialize the {@link Reader}
* @param tmpReader <code>true</code> if we are constructing a temporary
* reader {@link SequenceFile.Sorter.cloneFileAttributes},
* and hence do not initialize every component;
* <code>false</code> otherwise.
* @throws IOException
*/
private void init(boolean tempReader) throws IOException {
byte[] versionBlock = new byte[VERSION.length];
in.readFully(versionBlock);
if ((versionBlock[0] != VERSION[0]) ||
(versionBlock[1] != VERSION[1]) ||
(versionBlock[2] != VERSION[2]))
throw new IOException(this + " not a SequenceFile");
// Set 'version'
version = versionBlock[3];
if (version > VERSION[3])
throw new VersionMismatchException(VERSION[3], version);
if (version < BLOCK_COMPRESS_VERSION) {
UTF8 className = new UTF8();
className.readFields(in);
keyClassName = className.toStringChecked(); // key class name
className.readFields(in);
valClassName = className.toStringChecked(); // val class name
} else {
keyClassName = Text.readString(in);
valClassName = Text.readString(in);
}
if (version > 2) { // if version > 2
this.decompress = in.readBoolean(); // is compressed?
} else {
decompress = false;
}
if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
this.blockCompressed = in.readBoolean(); // is block-compressed?
} else {
blockCompressed = false;
}
// if version >= 5
// setup the compression codec
if (decompress) {
if (version >= CUSTOM_COMPRESS_VERSION) {
String codecClassname = Text.readString(in);
try {
Class<? extends CompressionCodec> codecClass
= conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
this.codec = ReflectionUtils.newInstance(codecClass, conf);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("Unknown codec: " +
codecClassname, cnfe);
}
} else {
codec = new DefaultCodec();
((Configurable)codec).setConf(conf);
}
}
this.metadata = new Metadata();
if (version >= VERSION_WITH_METADATA) { // if version >= 6
this.metadata.readFields(in);
}
if (version > 1) { // if version > 1
in.readFully(sync); // read sync bytes
headerEnd = in.getPos(); // record end of header
}
// Initialize... *not* if this we are constructing a temporary Reader
if (!tempReader) {
valBuffer = new DataInputBuffer();
if (decompress) {
valDecompressor = CodecPool.getDecompressor(codec);
valInFilter = codec.createInputStream(valBuffer, valDecompressor);
valIn = new DataInputStream(valInFilter);
} else {
valIn = valBuffer;
}
if (blockCompressed) {
keyLenBuffer = new DataInputBuffer();
keyBuffer = new DataInputBuffer();
valLenBuffer = new DataInputBuffer();
keyLenDecompressor = CodecPool.getDecompressor(codec);
keyLenInFilter = codec.createInputStream(keyLenBuffer,
keyLenDecompressor);
keyLenIn = new DataInputStream(keyLenInFilter);
keyDecompressor = CodecPool.getDecompressor(codec);
keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
keyIn = new DataInputStream(keyInFilter);
valLenDecompressor = CodecPool.getDecompressor(codec);
valLenInFilter = codec.createInputStream(valLenBuffer,
valLenDecompressor);
valLenIn = new DataInputStream(valLenInFilter);
}
SerializationFactory serializationFactory =
new SerializationFactory(conf);
this.keyDeserializer =
getDeserializer(serializationFactory, getKeyClass());
if (this.keyDeserializer == null) {
throw new IOException(
"Could not find a deserializer for the Key class: '"
+ getKeyClass().getCanonicalName() + "'. "
+ "Please ensure that the configuration '" +
CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
+ "properly configured, if you're using "
+ "custom serialization.");
}
if (!blockCompressed) {
this.keyDeserializer.open(valBuffer);
} else {
this.keyDeserializer.open(keyIn);
}
this.valDeserializer =
getDeserializer(serializationFactory, getValueClass());
if (this.valDeserializer == null) {
throw new IOException(
"Could not find a deserializer for the Value class: '"
+ getValueClass().getCanonicalName() + "'. "
+ "Please ensure that the configuration '" +
CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
+ "properly configured, if you're using "
+ "custom serialization.");
}
this.valDeserializer.open(valIn);
}
}
@SuppressWarnings("unchecked")
private Deserializer getDeserializer(SerializationFactory sf, Class c) {
return sf.getDeserializer(c);
}
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
/** Returns the name of the key class. */
public String getKeyClassName() {
return keyClassName;
}
/** Returns the class of keys in this file. */
public synchronized Class<?> getKeyClass() {
if (null == keyClass) {
try {
keyClass = WritableName.getClass(getKeyClassName(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return keyClass;
}
/** Returns the name of the value class. */
public String getValueClassName() {
return valClassName;
}
/** Returns the class of values in this file. */
public synchronized Class<?> getValueClass() {
if (null == valClass) {
try {
valClass = WritableName.getClass(getValueClassName(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return valClass;
}
/** Returns true if values are compressed. */
public boolean isCompressed() { return decompress; }
/** Returns true if records are block-compressed. */
public boolean isBlockCompressed() { return blockCompressed; }
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; }
/**
* Get the compression type for this file.
* @return the compression type
*/
public CompressionType getCompressionType() {
if (decompress) {
return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
} else {
return CompressionType.NONE;
}
}
/** Returns the metadata object of the file */
public Metadata getMetadata() {
return this.metadata;
}
/** Returns the configuration used for this file. */
Configuration getConf() { return conf; }
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
/** Read the next 'compressed' block */
private synchronized void readBlock() throws IOException {
// Check if we need to throw away a whole block of
// 'values' due to 'lazy decompression'
if (lazyDecompress && !valuesDecompressed) {
in.seek(WritableUtils.readVInt(in)+in.getPos());
in.seek(WritableUtils.readVInt(in)+in.getPos());
}
// Reset internal states
noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
valuesDecompressed = false;
//Process sync
if (sync != null) {
in.readInt();
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) // check it
throw new IOException("File is corrupt!");
}
syncSeen = true;
// Read number of records in this block
noBufferedRecords = WritableUtils.readVInt(in);
// Read key lengths and keys
readBuffer(keyLenBuffer, keyLenInFilter);
readBuffer(keyBuffer, keyInFilter);
noBufferedKeys = noBufferedRecords;
// Read value lengths and values
if (!lazyDecompress) {
readBuffer(valLenBuffer, valLenInFilter);
readBuffer(valBuffer, valInFilter);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
}
/**
* Position valLenIn/valIn to the 'value'
* corresponding to the 'current' key
*/
private synchronized void seekToCurrentValue() throws IOException {
if (!blockCompressed) {
if (decompress) {
valInFilter.resetState();
}
valBuffer.reset();
} else {
// Check if this is the first value in the 'block' to be read
if (lazyDecompress && !valuesDecompressed) {
// Read the value lengths and values
readBuffer(valLenBuffer, valLenInFilter);
readBuffer(valBuffer, valInFilter);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
// Calculate the no. of bytes to skip
// Note: 'current' key has already been read!
int skipValBytes = 0;
int currentKey = noBufferedKeys + 1;
for (int i=noBufferedValues; i > currentKey; --i) {
skipValBytes += WritableUtils.readVInt(valLenIn);
--noBufferedValues;
}
// Skip to the 'val' corresponding to 'current' key
if (skipValBytes > 0) {
if (valIn.skipBytes(skipValBytes) != skipValBytes) {
throw new IOException("Failed to seek to " + currentKey +
"(th) value!");
}
}
}
}
/**
* Get the 'value' corresponding to the last read 'key'.
* @param val : The 'value' to be read.
* @throws IOException
*/
public synchronized void getCurrentValue(Writable val)
throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!blockCompressed) {
val.readFields(valIn);
if (valIn.read() > 0) {
LOG.info("available bytes: " + valIn.available());
throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ " bytes, should read " +
(valBuffer.getLength()-keyLength));
}
} else {
// Get the value
int valLength = WritableUtils.readVInt(valLenIn);
val.readFields(valIn);
// Read another compressed 'value'
--noBufferedValues;
// Sanity check
if ((valLength < 0) && LOG.isDebugEnabled()) {
LOG.debug(val + " is a zero-length value");
}
}
}
/**
* Get the 'value' corresponding to the last read 'key'.
* @param val : The 'value' to be read.
* @throws IOException
*/
public synchronized Object getCurrentValue(Object val)
throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!blockCompressed) {
val = deserializeValue(val);
if (valIn.read() > 0) {
LOG.info("available bytes: " + valIn.available());
throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ " bytes, should read " +
(valBuffer.getLength()-keyLength));
}
} else {
// Get the value
int valLength = WritableUtils.readVInt(valLenIn);
val = deserializeValue(val);
// Read another compressed 'value'
--noBufferedValues;
// Sanity check
if ((valLength < 0) && LOG.isDebugEnabled()) {
LOG.debug(val + " is a zero-length value");
}
}
return val;
}
@SuppressWarnings("unchecked")
private Object deserializeValue(Object val) throws IOException {
return valDeserializer.deserialize(val);
}
/** Read the next key in the file into <code>key</code>, skipping its
* value. True if another entry exists, and false at end of file. */
public synchronized boolean next(Writable key) throws IOException {
if (key.getClass() != getKeyClass())
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
if (!blockCompressed) {
outBuf.reset();
keyLength = next(outBuf);
if (keyLength < 0)
return false;
valBuffer.reset(outBuf.getData(), outBuf.getLength());
key.readFields(valBuffer);
valBuffer.mark(0);
if (valBuffer.getPosition() != keyLength)
throw new IOException(key + " read " + valBuffer.getPosition()
+ " bytes, should read " + keyLength);
} else {
//Reset syncSeen
syncSeen = false;
if (noBufferedKeys == 0) {
try {
readBlock();
} catch (EOFException eof) {
return false;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
// Sanity check
if (keyLength < 0) {
return false;
}
//Read another compressed 'key'
key.readFields(keyIn);
--noBufferedKeys;
}
return true;
}
/** Read the next key/value pair in the file into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
* end of file */
public synchronized boolean next(Writable key, Writable val)
throws IOException {
if (val.getClass() != getValueClass())
throw new IOException("wrong value class: "+val+" is not "+valClass);
boolean more = next(key);
if (more) {
getCurrentValue(val);
}
return more;
}
/**
* Read and return the next record length, potentially skipping over
* a sync block.
* @return the length of the next record or -1 if there is no next record
* @throws IOException
*/
private synchronized int readRecordLength() throws IOException {
if (in.getPos() >= end) {
return -1;
}
int length = in.readInt();
if (version > 1 && sync != null &&
length == SYNC_ESCAPE) { // process a sync entry
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) // check it
throw new IOException("File is corrupt!");
syncSeen = true;
if (in.getPos() >= end) {
return -1;
}
length = in.readInt(); // re-read length
} else {
syncSeen = false;
}
return length;
}
/** Read the next key/value pair in the file into <code>buffer</code>.
* Returns the length of the key read, or -1 if at end of file. The length
* of the value may be computed by calling buffer.getLength() before and
* after calls to this method. */
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
@Deprecated
synchronized int next(DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
if (blockCompressed) {
throw new IOException("Unsupported call for block-compressed" +
" SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
try {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
buffer.write(in, length);
return keyLength;
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
return next(buffer);
}
}
public ValueBytes createValueBytes() {
ValueBytes val = null;
if (!decompress || blockCompressed) {
val = new UncompressedBytes();
} else {
val = new CompressedBytes(codec);
}
return val;
}
/**
* Read 'raw' records.
* @param key - The buffer into which the key is read
* @param val - The 'raw' value
* @return Returns the total record length or -1 for end of file
* @throws IOException
*/
public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
throws IOException {
if (!blockCompressed) {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
int valLength = length - keyLength;
key.write(in, keyLength);
if (decompress) {
CompressedBytes value = (CompressedBytes)val;
value.reset(in, valLength);
} else {
UncompressedBytes value = (UncompressedBytes)val;
value.reset(in, valLength);
}
return length;
} else {
//Reset syncSeen
syncSeen = false;
// Read 'key'
if (noBufferedKeys == 0) {
if (in.getPos() >= end)
return -1;
try {
readBlock();
} catch (EOFException eof) {
return -1;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
if (keyLength < 0) {
throw new IOException("zero length key found!");
}
key.write(keyIn, keyLength);
--noBufferedKeys;
// Read raw 'value'
seekToCurrentValue();
int valLength = WritableUtils.readVInt(valLenIn);
UncompressedBytes rawValue = (UncompressedBytes)val;
rawValue.reset(valIn, valLength);
--noBufferedValues;
return (keyLength+valLength);
}
}
/**
* Read 'raw' keys.
* @param key - The buffer into which the key is read
* @return Returns the key length or -1 for end of file
* @throws IOException
*/
public synchronized int nextRawKey(DataOutputBuffer key)
throws IOException {
if (!blockCompressed) {
recordLength = readRecordLength();
if (recordLength == -1) {
return -1;
}
keyLength = in.readInt();
key.write(in, keyLength);
return keyLength;
} else {
//Reset syncSeen
syncSeen = false;
// Read 'key'
if (noBufferedKeys == 0) {
if (in.getPos() >= end)
return -1;
try {
readBlock();
} catch (EOFException eof) {
return -1;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
if (keyLength < 0) {
throw new IOException("zero length key found!");
}
key.write(keyIn, keyLength);
--noBufferedKeys;
return keyLength;
}
}
/** Read the next key in the file, skipping its
* value. Return null at end of file. */
public synchronized Object next(Object key) throws IOException {
if (key != null && key.getClass() != getKeyClass()) {
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
}
if (!blockCompressed) {
outBuf.reset();
keyLength = next(outBuf);
if (keyLength < 0)
return null;
valBuffer.reset(outBuf.getData(), outBuf.getLength());
key = deserializeKey(key);
valBuffer.mark(0);
if (valBuffer.getPosition() != keyLength)
throw new IOException(key + " read " + valBuffer.getPosition()
+ " bytes, should read " + keyLength);
} else {
//Reset syncSeen
syncSeen = false;
if (noBufferedKeys == 0) {
try {
readBlock();
} catch (EOFException eof) {
return null;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
// Sanity check
if (keyLength < 0) {
return null;
}
//Read another compressed 'key'
key = deserializeKey(key);
--noBufferedKeys;
}
return key;
}
@SuppressWarnings("unchecked")
private Object deserializeKey(Object key) throws IOException {
return keyDeserializer.deserialize(key);
}
/**
* Read 'raw' values.
* @param val - The 'raw' value
* @return Returns the value length
* @throws IOException
*/
public synchronized int nextRawValue(ValueBytes val)
throws IOException {
// Position stream to current value
seekToCurrentValue();
if (!blockCompressed) {
int valLength = recordLength - keyLength;
if (decompress) {
CompressedBytes value = (CompressedBytes)val;
value.reset(in, valLength);
} else {
UncompressedBytes value = (UncompressedBytes)val;
value.reset(in, valLength);
}
return valLength;
} else {
int valLength = WritableUtils.readVInt(valLenIn);
UncompressedBytes rawValue = (UncompressedBytes)val;
rawValue.reset(valIn, valLength);
--noBufferedValues;
return valLength;
}
}
private void handleChecksumException(ChecksumException e)
throws IOException {
if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
} else {
throw e;
}
}
/** disables sync. often invoked for tmp files */
synchronized void ignoreSync() {
sync = null;
}
/** Set the current byte position in the input file.
*
* <p>The position passed must be a position returned by {@link
* SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary
* position, use {@link SequenceFile.Reader#sync(long)}.
*/
public synchronized void seek(long position) throws IOException {
in.seek(position);
if (blockCompressed) { // trigger block read
noBufferedKeys = 0;
valuesDecompressed = true;
}
}
/** Seek to the next sync mark past a given position.*/
public synchronized void sync(long position) throws IOException {
if (position+SYNC_SIZE >= end) {
seek(end);
return;
}
if (position < headerEnd) {
// seek directly to first record
in.seek(headerEnd);
// note the sync marker "seen" in the header
syncSeen = true;
return;
}
try {
seek(position+4); // skip escape
in.readFully(syncCheck);
int syncLen = sync.length;
for (int i = 0; in.getPos() < end; i++) {
int j = 0;
for (; j < syncLen; j++) {
if (sync[j] != syncCheck[(i+j)%syncLen])
break;
}
if (j == syncLen) {
in.seek(in.getPos() - SYNC_SIZE); // position before sync
return;
}
syncCheck[i%syncLen] = in.readByte();
}
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
}
}
/** Returns true iff the previous call to next passed a sync mark.*/
public synchronized boolean syncSeen() { return syncSeen; }
/** Return the current byte position in the input file. */
public synchronized long getPosition() throws IOException {
return in.getPos();
}
/** Returns the name of the file. */
@Override
public String toString() {
return filename;
}
}
SequenceFile 資料格式
根據剛才的介紹,SequenceFiles有3種壓縮形式,那麼,相對應的SequenceFiles便有三種不同的資料格式,這三種資料格式,有一個相同的header
SequenceFile Header
- version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
- keyClassName -key class
- valueClassName - value class
- compression - A boolean which specifies if compression is turned on for keys/values in this file.
- blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.
- compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).
- metadata - SequenceFile.Metadata for this file.
- sync - A sync marker to denote end of the header.