處理海量小檔案——本地檔案讀成sequenceFile檔案
當處理海量小檔案時,先將小檔案進行sequenceFile操作或者類似操作處理,然後再上傳到HDFS系統進行下一步的處理。(如有其它建議,歡迎留言)
一、直接上傳本地柵格資料將導致的問題
HDFS在儲存檔案時,會將檔案break them into chunks,預設inputSplit的大小與block塊的大小一致,為128M,如果單個檔案的大小小於block塊的大小則不會切分,直接將改小檔案儲存到一個block塊中。因此如果不對柵格資料檔案做處理,將導致佔據大量的block塊;由於namenode中會儲存為元資料資訊,因此也將導致namenode節點記錄大量小檔案的位置等元資料資訊而產生壓力過大,甚至namenode節點的記憶體可能會被佔滿。
二、在本地將柵格資料處理成sequenceFile檔案
SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value對而設計的一種平面檔案。通常對小檔案的處理是使用sequenceFile檔案或MapFile檔案。此次選用的處理方式是使用SequenceFile檔案。(MapFile檔案由兩部分組成,data和index,index作為檔案的索引,儲存每個Record的key值以及它的偏移量。mapFile檔案的檢索效率較sequenceFile檔案高,但是訪問mapFile檔案時需要先將索引檔案載入到記憶體)
由於sequenceFile檔案由Key和value組成,此處的key值存放的為檔案的路徑,例如:file:/home/greatmap/World/6/109/48.jpeg;value的值存放的為圖片的位元組陣列檔案。柵格資料存放在的資料夾下,通過遍歷資料夾,將所有的圖片都寫成一個sequenceFile檔案。
下面是具體實現過程:
public class SequenceFileTest { //本地linux磁碟輸出路徑 static String PATH = "/home/greatmap/out"; static SequenceFile.Writer writer = null; public static void main(String[] args) throws Exception{ //設定讀取本地磁碟檔案 Configuration conf = new Configuration(); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); //linux磁碟下路徑 String path = "/home/greatmap/World/"; URI uri = new URI(path); FileSystem fileSystem = FileSystem.get(uri, conf); //例項化writer物件 writer = SequenceFile.createWriter(fileSystem, conf, new Path(PATH), Text.class, BytesWritable.class); //遞迴遍歷資料夾,並將檔案下的檔案寫入sequenceFile檔案 listFileAndWriteToSequenceFile(fileSystem,path); //關閉流 org.apache.hadoop.io.IOUtils.closeStream(writer); } /**** * 遞迴檔案;並將檔案寫成SequenceFile檔案 * @param fileSystem * @param path * @throws Exception */ public static void listFileAndWriteToSequenceFile(FileSystem fileSystem,String path) throws Exception{ final FileStatus[] listStatuses = fileSystem.listStatus(new Path(path)); for (FileStatus fileStatus : listStatuses) { if(fileStatus.isFile()){ Text fileText = new Text(fileStatus.getPath().toString()); System.out.println(fileText.toString()); //返回一個SequenceFile.Writer例項 需要資料流和path物件 將資料寫入了path物件 FSDataInputStream in = fileSystem.open(new Path(fileText.toString())); byte[] buffer = IOUtils.toByteArray(in); in.read(buffer); BytesWritable value = new BytesWritable(buffer); //寫成SequenceFile檔案 writer.append(fileText, value); } if(fileStatus.isDirectory()){ listFileAndWriteToSequenceFile(fileSystem,fileStatus.getPath().toString()); } // org.apache.hadoop.io.IOUtils.closeStream(writer); } }}