用Hadoop AVRO進行大量小檔案的處理
使用 使用使用 使用 HDFS 儲存大量小檔案的缺點:
1.Hadoop NameNode 在記憶體中儲存所有檔案的“元資訊”資料。據統計,每一個檔案需要消耗 NameNode600 位元組記憶體。如果需要儲存大量的小檔案會對NameNode 造成極大的壓力。
2.如果採用 Hadoop MapReduce 進行小檔案的處理,那麼 Mapper 的個數就會跟小檔案的個數成線性相關(備註:FileInputFormat 預設只對大於 HDFS Block Size的檔案進行劃分)。如果小檔案特別多,MapReduce 就會在消耗大量的時間進行Map 程序的建立和銷燬。
為了解決大量小檔案帶來的問題,我們可以將很多小檔案打包,組裝成一個大檔案。 Apache Avro 是語言獨立的資料序列化系統。 Avro 在概念上分為兩部分:模式(Schema)和資料(一般為二進位制資料)。Schema 一般採用 Json 格式進行描述。Avro 同時定義了一些自己的資料型別如表所示:
Avro基礎資料型別
型別 |
描述 |
模式 |
null |
The absence of a value |
"null" |
boolean |
A binary value |
"boolean" |
int |
32位帶符號整數 |
"int" |
long |
64位帶符號整數 |
"long" |
float |
32位單精度浮點數 |
"float" |
double |
64位雙精度浮點數 |
"double" |
bytes |
byte陣列 |
"bytes" |
string |
Unicode字串 |
"string" |
型別 |
描述 |
模式 |
array |
An ordered collection of objects. All objects in a particular array must have the same schema. |
{ "type": "array", "items": "long" } |
map |
An unordered collection of key-value pairs. Keys must be strings and values may be any type, although within a particular map, all values must have the same schema. |
{ "type": "map", "values": "string" } |
record |
A collection of named fields of any type. |
{ "type": "record", "name": "WeatherRecord", "doc": "A weather reading.", "fields": [ {"name": "year", "type": "int"}, {"name": "temperature", "type": "int"}, {"name": "stationId", "type": "string"} ] } |
enum |
A set of named values. |
{ "type": "enum", "name": "Cutlery", "doc": "An eating utensil.", "symbols": ["KNIFE", "FORK", "SPOON"] } |
fixed |
A fixed number of 8-bit unsigned bytes. |
{ "type": "fixed", "name": "Md5Hash", "size": 16 } |
union |
A union of schemas. A union is represented by a JSON array, where each element in the array is a schema. Data represented by a union must match one of the schemas in the union. |
[ "null", "string", {"type": "map", "values": "string"} ] |
Avro複雜資料型別
通過上圖所示,通過程式可以將本地的小檔案進行打包,組裝成一個大檔案在HDFS中進行儲存,本地的小檔案成為Avro的記錄。具體的程式如下面的程式碼所示:
- publicclass Demo {
- publicstaticfinal String FIELD_CONTENTS = "contents";
- publicstaticfinal String FIELD_FILENAME = "filename";
- publicstaticfinal String SCHEMA_JSON = "{\"type\": \"record\",\"name\": \"SmallFilesTest\", "
- + "\"fields\": ["
- + "{\"name\":\""
- + FIELD_FILENAME
- + "\",\"type\":\"string\"},"
- + "{\"name\":\""
- + FIELD_CONTENTS
- + "\", \"type\":\"bytes\"}]}";
- publicstaticfinal Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
- publicstaticvoid writeToAvro(File srcPath, OutputStream outputStream) throws IOException {
- DataFileWriter<Object> writer = new DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100);
- writer.setCodec(CodecFactory.snappyCodec());
- writer.create(SCHEMA, outputStream);
- for (Object obj : FileUtils.listFiles(srcPath, null, false)){
- File file = (File) obj;
- String filename = file.getAbsolutePath();
- byte content[] = FileUtils.readFileToByteArray(file);
- GenericRecord record = new GenericData.Record(SCHEMA);
- record.put(FIELD_FILENAME, filename);
- record.put(FIELD_CONTENTS, ByteBuffer.wrap(content));
- writer.append(record);
- System.out.println(file.getAbsolutePath() + ":"+ DigestUtils.md5Hex(content));
- }
- IOUtils.cleanup(null, writer);
- IOUtils.cleanup(null, outputStream);
- }
- publicstaticvoid main(String args[]) throws Exception {
- Configuration config = new Configuration();
- FileSystem hdfs = FileSystem.get(config);
- File sourceDir = new File(args[0]);
- Path destFile = new Path(args[1]);
- OutputStream os = hdfs.create(destFile);
- writeToAvro(sourceDir, os);
- }
- }
- publicclass Demo {
- privatestaticfinal String FIELD_FILENAME = "filename";
- privatestaticfinal String FIELD_CONTENTS = "contents";
- publicstaticvoid readFromAvro(InputStream is) throws IOException {
- DataFileStream<Object> reader = new DataFileStream<Object>(is,new GenericDatumReader<Object>());
- for (Object o : reader) {
- GenericRecord r = (GenericRecord) o;
- System.out.println(r.get(FIELD_FILENAME)+ ":"+DigestUtils.md5Hex(((ByteBuffer)r.get(FIELD_CONTENTS)).array()));
- }
- IOUtils.cleanup(null, is);
- IOUtils.cleanup(null, reader);
- }
- publicstaticvoid main(String... args) throws Exception {
- Configuration config = new Configuration();
- FileSystem hdfs = FileSystem.get(config);
- Path destFile = new Path(args[0]);
- InputStream is = hdfs.open(destFile);
- readFromAvro(is);
- }
- }