Hadoop基於檔案的資料結構及例項
基於檔案的資料結構
兩種檔案格式:
1、SequenceFile
2、MapFile
SequenceFile
1、SequenceFile檔案是Hadoop用來儲存二進位制形式的<key,value>
對而設計的一種平面檔案(Flat File)。
2、可以把SequenceFile當做一個容器,把所有檔案打包到SequenceFile類中可以高效的對小檔案進行儲存和處理。
3、SequenceFile檔案並不按照其儲存的key進行排序儲存,SequenceFile的內部類Writer**提供了append功能**。
4、SequenceFile中的key和value可以是任意型別Writable
SequenceFile壓縮
1、SequenceFile的內部格式取決於是否啟用壓縮,如果是,要麼是記錄壓縮,要麼是塊壓縮。
2、三種類型:
A.無壓縮型別:如果沒有啟用壓縮(預設設定),那麼每個記錄就由它的記錄長度(位元組數)、鍵的長度,鍵和值組成。長度欄位為四位元組。
B.記錄壓縮型別:記錄壓縮格式與無壓縮格式基本相同,不同的是值位元組是用定義在頭部的編碼器來壓縮。注意,鍵是不壓縮的。
C.塊壓縮型別:塊壓縮一次壓縮多個記錄,因此它比記錄壓縮更緊湊,而且一般優先選擇。當記錄的位元組數達到最小大小,才會新增到塊。該最小值由io.seqfile.compress.blocksize
無壓縮格式與記錄壓縮格式
塊壓縮格式
SequenceFile檔案格式的好處:
A.支援基於記錄(Record)或塊(Block)的資料壓縮。
B.支援splittable,能夠作為MapReduce的輸入分片。
C.修改簡單:主要負責修改相應的業務邏輯,而不用考慮具體的儲存格式。
SequenceFile檔案格式的壞處:
壞處是需要一個合併檔案的過程,且合併後的檔案將不方便檢視。因為它是二進位制檔案。
讀寫SequenceFile
寫過程:
1)建立Configuration
2)獲取FileSystem
3)建立檔案輸出路徑Path
4)呼叫SequenceFile.createWriter得到SequenceFile.Writer物件
5)呼叫SequenceFile.Writer.append追加寫入檔案
6)關閉流
讀過程:
1)建立Configuration
2)獲取FileSystem
3)建立檔案輸出路徑Path
4)new一個SequenceFile.Reader進行讀取
5)得到keyClass和valueClass
6)關閉流
org.apache.hadoop.io
Class SequenceFile
There are three SequenceFile Writers based on the SequenceFile.CompressionType used to compress key/value pairs:
1、Writer : Uncompressed records.
2、RecordCompressWriter : Record-compressed files, only compress values.
3、BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable
無壓縮方式、記錄壓縮、塊壓縮例項
package SequenceFile;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.util.ReflectionUtils;
public class Demo01 {
final static String uri= "hdfs://liguodong:8020/liguodong";
final static String[] data = {
"apache,software","chinese,good","james,NBA","index,pass"
};
public static void main(String[] args) throws IOException {
//1
Configuration configuration = new Configuration();
//2
FileSystem fs = FileSystem.get(URI.create(uri),configuration);
//3
Path path = new Path("/tmp.seq");
write(fs,configuration,path);
read(fs,configuration,path);
}
public static void write(FileSystem fs,Configuration configuration,Path path) throws IOException{
//4
IntWritable key = new IntWritable();
Text value = new Text();
//無壓縮
/*@SuppressWarnings("deprecation")
SequenceFile.Writer writer = SequenceFile.createWriter
(fs,configuration,path,key.getClass(),value.getClass());*/
//記錄壓縮
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = SequenceFile.createWriter
(fs,configuration,path,key.getClass(),
value.getClass(),CompressionType.RECORD,new BZip2Codec());
//塊壓縮
/*@SuppressWarnings("deprecation")
SequenceFile.Writer writer = SequenceFile.createWriter
(fs,configuration,path,key.getClass(),
value.getClass(),CompressionType.BLOCK,new BZip2Codec());*/
//5
for (int i = 0; i < 30; i++) {
key.set(100-i);
value.set(data[i%data.length]);
writer.append(key, value);
}
//6、關閉流
IOUtils.closeStream(writer);
}
public static void read(FileSystem fs,Configuration configuration,Path path) throws IOException {
//4
@SuppressWarnings("deprecation")
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path,configuration);
//5
Writable key = (Writable) ReflectionUtils.newInstance
(reader.getKeyClass(), configuration);
Writable value = (Writable) ReflectionUtils.newInstance
(reader.getValueClass(), configuration);
while(reader.next(key,value)){
System.out.println("key = " + key);
System.out.println("value = " + value);
System.out.println("position = "+ reader.getPosition());
}
IOUtils.closeStream(reader);
}
}
執行結果:
key = 100
value = apache,software
position = 164
key = 99
value = chinese,good
position = 197
key = 98
value = james,NBA
position = 227
key = 97
value = index,pass
position = 258
key = 96
value = apache,software
position = 294
key = 95
value = chinese,good
position = 327
......
key = 72
value = apache,software
position = 1074
key = 71
value = chinese,good
position = 1107
MapFile
public class MapFile {
/** The name of the index file. */
public static final String INDEX_FILE_NAME = "index";
/** The name of the data file. */
public static final String DATA_FILE_NAME = "data";
}
MapFile是經過排序的索引的SequenceFile,可以根據key進行查詢。
與SequenceFile不同的是, MapFile的Key一定要實現WritableComparable介面 ,即Key值是可比較的,而value是Writable型別的。
可以使用MapFile.fix()方法來重建索引,把SequenceFile轉換成MapFile。
它有兩個靜態成員變數:
static final String INDEX_FILE_NAME
static final String DATA_FILE_NAME
通過觀察其目錄結構可以看到MapFile由兩部分組成,分別是data和index。
index作為檔案的資料索引,主要記錄了每個Record的key值,以及該Record在檔案中的偏移位置。
在MapFile被訪問的時候,索引檔案會被載入到記憶體,通過索引對映關係可迅速定位到指定Record所在檔案位置。
因此,相對SequenceFile而言, MapFile的檢索效率是高效的,缺點是會消耗一部分記憶體來儲存index資料。
需注意的是, MapFile並不會把所有Record都記錄到index中去,預設情況下每隔128條記錄儲存一個索引對映。當然,記錄間隔可人為修改,通過MapFIle.Writer的setIndexInterval()
方法,或修改io.map.index.interval
屬性;
讀寫MapFile
寫過程:
1)建立Configuration
2)獲取FileSystem
3)建立檔案輸出路徑Path
4)new一個MapFile.Writer物件
5)呼叫MapFile.Writer.append追加寫入檔案
6)關閉流
讀過程:
1)建立Configuration
2)獲取FileSystem
3)建立檔案輸出路徑Path
4)new一個MapFile.Reader進行讀取
5)得到keyClass和valueClass
6)關閉流
具體操作與SequenceFile相似。
命令列檢視二進位制檔案
hdfs dfs -text /liguodong/tmp.seq