hadoop 壓縮和解壓
Hadoop對於壓縮檔案的支援
如果我們壓縮的檔案有相應壓縮格式的副檔名(比如lzo,gz,bzip2等),hadoop就會根據副檔名去選擇解碼器解壓。
hadoop對每個壓縮格式的支援,詳細見下表:
如果壓縮的檔案沒有副檔名,則需 要在執行mapreduce任務的時候指定輸入格式.
- hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-streaming-0.20.2-CD H3B4.jar
- -file /usr/home/hadoop/hello/mapper.py -mapper /usr/home/hadoop/hello/mapper.py
- -file /usr/home/hadoop/hello/reducer.py -reducer /usr/home/hadoop/hello/reducer.py
- -input lzotest -output result4
- -jobconf mapred.reduce.tasks=1
- *-inputformat org.apache.hadoop.mapred.LzoTextInputFormat*
hadoop下各種壓縮演算法的壓縮比,壓縮時間,解壓時間見下表:
壓縮演算法 | 原始檔案大小 | 壓縮後的檔案大小 | 壓縮速度 | 解壓縮速度 |
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO-bset | 8.3GB | 2GB | 4MB/s | 60.6MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/S | 74.6MB/s |
hadoop各種壓縮演算法的優缺點簡述
在考慮如何壓縮那些將由MapReduce處理的資料時,考慮壓縮格式是否支援分割是很重要的。考慮儲存在HDFS中的未壓縮的檔案,其大小為1GB,HDFS的塊大小為64MB,所以該檔案將被儲存為16塊,將此檔案用作輸入的MapReduce作業會建立1個輸人分片(split ,也稱為“分塊”。對於block,我們統一稱為“塊”。)每個分片都被作為一個獨立map任務的輸入單獨進行處理。
現在假設。該檔案是一個gzip格式的壓縮檔案,壓縮後的大小為1GB。和前面一樣,HDFS將此檔案儲存為16塊。然而,針對每一塊建立一個分塊是沒有用的,因為不可能從gzip資料流中的任意點開始讀取,map任務也不可能獨立於其他分塊只讀取一個分塊中的資料。gzip格式使用DEFLATE來儲存壓縮過的資料,DEFLATE將資料作為一系列壓縮過的塊進行儲存。問題是,每塊的開始沒有指定使用者在資料流中任意點定位到下一個塊的起始位置,而是其自身與資料流同步。因此,gzip不支援分割(塊)機制。
在這種情況下,MapReduce不分割gzip格式的檔案,因為它知道輸入是gzip壓縮格式的(通過副檔名得知),而gzip壓縮機制不支援分割機制。這樣是以犧牲本地化為代價:一個map任務將處理16個HDFS塊。大都不是map的本地資料。與此同時,因為map任務少,所以作業分割的粒度不夠細,從而導致執行時間變長。
在我們假設的例子中,如果是一個LZO格式的檔案,我們會碰到同樣的問題,因為基本壓縮格式不為reader提供方法使其與流同步。但是,bzip2格式的壓縮檔案確實提供了塊與塊之間的同步標記(一個48位的PI近似值),因此它支援分割機制。
對於檔案的收集,這些問題會稍有不同。ZIP是存檔格式,因此它可以將多個檔案合併為一個ZIP檔案。每個檔案單獨壓縮,所有文件的儲存位置儲存在ZIP檔案的尾部。這個屬性表明ZIP檔案支援檔案邊界處分割,每個分片中包括ZIP壓縮檔案中的一個或多個檔案。
在MapReduce我們應該使用哪種壓縮格式
根據應用的具體情況來決定應該使用哪種壓縮格式。就個人而言,更趨向於使用最快的速度壓縮,還是使用最優的空間壓縮?一般來說,應該嘗試不同的策略,並用具有代表性的資料集進行測試,從而找到最佳方法。對於那些大型的、沒有邊界的檔案,如日誌檔案,有以下選項。
儲存未壓縮的檔案。
使用支援分割機制的壓縮格式,如bzip2。
在應用中將檔案分割成幾個大的資料塊,然後使用任何一種支援的壓縮格式單獨壓縮每個資料塊(可不用考慮壓縮格式是否支援分割)。在這裡,需要選擇資料塊的大小使壓縮後的資料塊在大小上相當於HDFS的塊。
使用支援壓縮和分割的Sequence File(序列檔案)。
對於大型檔案,不要對整個檔案使用不支援分割的壓縮格式,因為這樣會損失本地性優勢,從而使降低MapReduce應用的效能。
hadoop支援Splittable壓縮lzo
在hadoop中使用lzo的壓縮演算法可以減小資料的大小和資料的磁碟讀寫時間,在HDFS中儲存壓縮資料,可以使叢集能儲存更多的資料,延長叢集的使用壽命。不僅如此,由於mapreduce作業通常瓶頸都在IO上,儲存壓縮資料就意味這更少的IO操作,job執行更加的高效。
但是在hadoop上使用壓縮也有兩個比較麻煩的地方:第一,有些壓縮格式不能被分塊,並行的處理,比如gzip。第二,另外的一些壓縮格式雖然支援分塊處理,但是解壓的過程非常的緩慢,使job的瓶頸轉移到了cpu上,例如bzip2。
如果能夠擁有一種壓縮演算法,即能夠被分塊,並行的處理,速度也非常的快,那就非常的理想。這種方式就是lzo。
lzo的壓縮檔案是由許多的小的blocks組成(約256K),使的hadoop的job可以根據block的劃分來split job。不僅如此,lzo在設計時就考慮到了效率問題,它的解壓速度是gzip的兩倍,這就讓它能夠節省很多的磁碟讀寫,它的壓縮比的不如gzip,大約壓縮出來的檔案比gzip壓縮的大一半,但是這樣仍然比沒有經過壓縮的檔案要節省20%-50%的儲存空間,這樣就可以在效率上大大的提高job執行的速度。
如何在MapReduce中使用壓縮
1.輸入的檔案的壓縮
如果輸入的檔案是壓縮過的,那麼在被MapReduce讀取時,它們會被自動解壓,根據副檔名來決定應該使用哪一個壓縮解碼器。
2.MapReduce作業的輸出的壓縮
如果要壓縮MapReduce作業的輸出,請在作業配置檔案中將mapred.output.compress屬性設定為true。將mapred.output.compression.codec屬性設定為自己打算使用的壓縮編碼/解碼器的類名。
如果為輸出使用了一系列檔案,可以設定mapred.output.compression.type屬性來控制壓縮型別,預設為RECORD,它壓縮單獨的記錄。將它改為BLOCK,則可以壓縮一組記錄。由於它有更好的壓縮比,所以推薦使用。
3.map作業輸出結果的壓縮
即使MapReduce應用使用非壓縮的資料來讀取和寫入,我們也可以受益於壓縮map階段的中間輸出。因為map作業的輸出會被寫入磁碟並通過網路傳輸到reducer節點,所以如果使用LZO之類的快速壓縮,能得到更好的效能,因為傳輸的資料量大大減少了。以下程式碼顯示了啟用rnap輸出壓縮和設定壓縮格式的配置屬性。
- conf.setCompressMapOutput(true);
- conf.setMapOutputCompressorClass(GzipCodec.class);
本地壓縮庫
考慮到效能,最好使用一個本地庫(native library)來壓縮和解壓。例如,在一個測試中,使用本地gzip壓縮庫減少了解壓時間50%,壓縮時間大約減少了10%(與內建的Java實現相比較)。表4-4展示了Java和本地提供的每個壓縮格式的實現。井不是所有的格式都有本地實現(例如bzip2壓縮),而另一些則僅有本地實現(例如LZO)。
壓縮格式 | Java實現 | 本地實現 |
DEFLATE | 是 | 是 |
gzip | 是 | 是 |
bzip2 | 是 | 否 |
LZO | 否 | 是 |
Hadoop帶有預置的32位和64位Linux的本地壓縮庫,位於庫/本地目錄。對於其他平臺,需要自己編譯庫,具體請參見Hadoop的維基百科http://wiki.apache.org/hadoop/NativeHadoop。
本地庫通過Java系統屬性java.library.path來使用。Hadoop的指令碼在bin目錄中已經設定好這個屬性,但如果不使用該指令碼,則需要在應用中設定屬性。
預設情況下,Hadoop會在它執行的平臺上查詢本地庫,如果發現就自動載入。這意味著不必更改任何配置設定就可以使用本地庫。在某些情況下,可能希望禁用本地庫,比如在除錯壓縮相關問題的時候。為此,將屬性hadoop.native.lib設定為false,即可確保內建的Java等同內建實現被使用(如果它們可用的話)。
檔案的壓縮有兩大好處:1、可以減少儲存檔案所需要的磁碟空間;2、可以加速資料在網路和磁碟上的傳輸。尤其是在處理大資料時,這兩大好處是相當重要的。
下面是一個使用gzip工具壓縮檔案的例子。將檔案/user/hadoop/aa.txt進行壓縮,壓縮後為/user/hadoop/text.gz
1 package com.hdfs; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.io.OutputStream; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FSDataInputStream; 10 import org.apache.hadoop.fs.FSDataOutputStream; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.IOUtils; 14 import org.apache.hadoop.io.compress.CompressionCodec; 15 import org.apache.hadoop.io.compress.CompressionCodecFactory; 16 import org.apache.hadoop.io.compress.CompressionInputStream; 17 import org.apache.hadoop.io.compress.CompressionOutputStream; 18 import org.apache.hadoop.util.ReflectionUtils; 19 20 public class CodecTest { 21 //壓縮檔案 22 public static void compress(String codecClassName) throws Exception{ 23 Class<?> codecClass = Class.forName(codecClassName); 24 Configuration conf = new Configuration(); 25 FileSystem fs = FileSystem.get(conf); 26 CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); 27 //指定壓縮檔案路徑 28 FSDataOutputStream outputStream = fs.create(new Path("/user/hadoop/text.gz")); 29 //指定要被壓縮的檔案路徑 30 FSDataInputStream in = fs.open(new Path("/user/hadoop/aa.txt")); 31 //建立壓縮輸出流 32 CompressionOutputStream out = codec.createOutputStream(outputStream); 33 IOUtils.copyBytes(in, out, conf); 34 IOUtils.closeStream(in); 35 IOUtils.closeStream(out); 36 } 37 38 //解壓縮 39 public static void uncompress(String fileName) throws Exception{ 40 Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec"); 41 Configuration conf = new Configuration(); 42 FileSystem fs = FileSystem.get(conf); 43 CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); 44 FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/text.gz")); 45 //把text檔案裡到資料解壓,然後輸出到控制檯 46 InputStream in = codec.createInputStream(inputStream); 47 IOUtils.copyBytes(in, System.out, conf); 48 IOUtils.closeStream(in); 49 } 50 51 //使用副檔名來推斷二來的codec來對檔案進行解壓縮 52 public static void uncompress1(String uri) throws IOException{ 53 Configuration conf = new Configuration(); 54 FileSystem fs = FileSystem.get(URI.create(uri), conf); 55 56 Path inputPath = new Path(uri); 57 CompressionCodecFactory factory = new CompressionCodecFactory(conf); 58 CompressionCodec codec = factory.getCodec(inputPath); 59 if(codec == null){ 60 System.out.println("no codec found for " + uri); 61 System.exit(1); 62 } 63 String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); 64 InputStream in = null; 65 OutputStream out = null; 66 try { 67 in = codec.createInputStream(fs.open(inputPath)); 68 out = fs.create(new Path(outputUri)); 69 IOUtils.copyBytes(in, out, conf); 70 } finally{ 71 IOUtils.closeStream(out); 72 IOUtils.closeStream(in); 73 } 74 } 75 76 public static void main(String[] args) throws Exception { 77 //compress("org.apache.hadoop.io.compress.GzipCodec"); 78 //uncompress("text"); 79 uncompress1("hdfs://master:9000/user/hadoop/text.gz"); 80 } 81 82 }
首先執行77行進行壓縮,壓縮後執行第78行進行解壓縮,這裡解壓到標準輸出,所以執行78行會再控制檯看到檔案/user/hadoop/aa.txt的內容。如果執行79行的話會將檔案解壓到/user/hadoop/text,他是根據/user/hadoop/text.gz的副檔名判斷使用哪個解壓工具進行解壓的。解壓後的路徑就是去掉副檔名。
進行檔案壓縮後,在執行命令./hadoop fs -ls /user/hadoop/檢視檔案資訊,如下:
1 [[email protected] bin]$ ./hadoop fs -ls /user/hadoop/ 2 Found 7 items 3 -rw-r--r-- 3 hadoop supergroup 76805248 2013-06-17 23:55 /user/hadoop/aa.mp4 4 -rw-r--r-- 3 hadoop supergroup 520 2013-06-17 22:29 /user/hadoop/aa.txt 5 drwxr-xr-x - hadoop supergroup 0 2013-06-16 17:19 /user/hadoop/input 6 drwxr-xr-x - hadoop supergroup 0 2013-06-16 19:32 /user/hadoop/output 7 drwxr-xr-x - hadoop supergroup 0 2013-06-18 17:08 /user/hadoop/test 8 drwxr-xr-x - hadoop supergroup 0 2013-06-18 19:45 /user/hadoop/test1 9 -rw-r--r-- 3 hadoop supergroup 46 2013-06-19 20:09 /user/hadoop/text.gz
第4行為壓縮之前的檔案,大小為520個位元組。第9行為壓縮後的檔案,大小為46個位元組。由此可以看出上面講的壓縮的兩大好處了。