Hadoop權威指南---I/O操作
目錄
Hadoop的I/O操作
1、資料完整性
資料在進過網路io傳輸或者磁碟io時有可能會損壞,因此一般通過計算校驗和來確定傳輸資料是否被損壞。校驗和checksum也有可能會損壞,但是因為其數量很小,出現損壞的機率很小。
1.1、HDFS的資料完整性
簡單來說,在客戶端往datanode寫資料的時候也會發生校驗和,和datanode節點接收到的資料計算得到的校驗和對比,如果不一致會報錯給客戶端,如果一致,在datanode節點每個資料塊下也會儲存一個數據塊的校驗和檔案;當客戶端讀取資料塊時也會讀取校驗和檔案,然後和自己通過資料塊計算的校驗和對比,看看資料塊是否被損壞。並且持久化DataNode節點的校驗和檔案會實時更新最後客戶端的訪問時間,好用於統計該檔案塊什麼時候開始損壞的。
1.2、 LocalFileSystem 和 ChecksumFileSystem
ChecksumFileSystem 繼承自FileSystem
Path getChecksumFile(Path file)方法可以獲取任意一個檔案的校驗和路徑
public abstract class ChecksumFileSystem extends FilterFileSystem {}
public class FilterFileSystem extends FileSystem {}
2、 壓縮 CompressionCodec
壓縮檔案的兩大好處:
- 減少儲存檔案所需要的磁碟空間;
- 加速資料在網路和磁碟上的傳輸;
2.1、 codec:Hadoop中對常用壓縮解壓縮演算法的實現
1)、通過CompressionCodec 對資料流進行壓縮和解壓縮
簡單來說就是需要對輸出流進行壓縮的時候呼叫createOutputStream來包裝一個輸出流,解壓縮的時候呼叫createInputStream來包裝一個輸入流
public interface CompressionCodec { CompressionOutputStream createOutputStream(OutputStream var1) throws IOException; CompressionOutputStream createOutputStream(OutputStream var1, Compressor var2) throws IOException; Class<? extends Compressor> getCompressorType(); Compressor createCompressor(); CompressionInputStream createInputStream(InputStream var1) throws IOException; CompressionInputStream createInputStream(InputStream var1, Decompressor var2) throws IOException; Class<? extends Decompressor> getDecompressorType(); Decompressor createDecompressor(); String getDefaultExtension(); public static class Util { public Util() { } static CompressionOutputStream createOutputStreamWithCodecPool(CompressionCodec codec, Configuration conf, OutputStream out) throws IOException { Compressor compressor = CodecPool.getCompressor(codec, conf); CompressionOutputStream stream = null; try { stream = codec.createOutputStream(out, compressor); } finally { if (stream == null) { CodecPool.returnCompressor(compressor); } else { stream.setTrackedCompressor(compressor); } } return stream; } static CompressionInputStream createInputStreamWithCodecPool(CompressionCodec codec, Configuration conf, InputStream in) throws IOException { Decompressor decompressor = CodecPool.getDecompressor(codec); CompressionInputStream stream = null; try { stream = codec.createInputStream(in, decompressor); } finally { if (stream == null) { CodecPool.returnDecompressor(decompressor); } else { stream.setTrackedDecompressor(decompressor); } } return stream; } } }
public abstract class CompressionOutputStream extends OutputStream {
protected final OutputStream out;
private Compressor trackedCompressor;
。。。
}
public abstract class CompressionInputStream extends InputStream implements Seekable {
protected final InputStream in;
protected long maxAvailableData = 0L;
private Decompressor trackedDecompressor;
。。。
}
2)、通過CompressionCodecFactory的getCodec方法推斷檔案使用的壓縮型別類CompressionCodec
其中支援通過檔案的副檔名來判斷使用的壓縮演算法類,通過輸入檔案的路徑path
3)、壓縮解壓縮的原生類庫
4)、CodecPool是一個靜態工廠類
類似於資料庫連線池、執行緒池的概念,主要是重用Compressor和Decompressor物件
2.2、 壓縮和輸入分片
支援切分的壓縮演算法bzip2
2.3、 在MapReduce中使用壓縮
3、 序列化 Writable
3.1 Writable介面
public interface Writable {
void write(DataOutput var1) throws IOException;
void readFields(DataInput var1) throws IOException;
}
其中:
- write方式是執行序列化操作的,把writeable物件自己寫入到輸出流中,然後轉化為位元組陣列;
- readFields是執行反序列化的,把位元組陣列封裝到輸入流中,然後根據輸入流來構建writeable物件;
Writable介面的序列化 :把一個Writable物件序列化為位元組陣列
Writable介面的反序列化 :把位元組陣列中的資料反序列化為一個Writable物件
WritableComparable和comparator
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
public class WritableComparator implements RawComparator, Configurable {}
3.2 Writable類
1)、Writable的基本型別封裝
2)、text型別(其也實現了序列化介面Writable)
對比Text和IntWritable型別
public class Text extends BinaryComparable implements WritableComparable<BinaryComparable> {}
public class IntWritable implements WritableComparable<IntWritable> {}
text是針對UTF-8序列的Writable類,一般可以認為是java的String的Writable等價(一般作為MapReduce的Key)
3)、BytesWritable
4)、NullWritable
5)、ObjectWritable和GenericWritable
6)、Writable的集合類
3.3 實現定製的Writable集合
3.4 序列化框架
public class WritableSerialization extends Configured implements Serialization<Writable> {}
public interface Serialization<T> {
boolean accept(Class<?> var1);
Serializer<T> getSerializer(Class<T> var1);
Deserializer<T> getDeserializer(Class<T> var1);
}
4、 基於檔案的資料結構
4.1 關於SequenceFile
順序檔案:以key:value形式的文字檔案???
4.2 關於MapFile
4.3 其他檔案格式和麵向列的格式
參考:
Hadoop權威指南.大資料的儲存與分析.第4版---第5章 Hadoop的I/O操作