1. 程式人生 > >Spark讀取壓縮檔案

Spark讀取壓縮檔案

前言

本文講如何用spark讀取gz型別的壓縮檔案,以及如何解決我遇到的各種問題。

1、檔案壓縮

下面這一部分摘自Spark快速大資料分析:
  在大資料工作中,我們經常需要對資料進行壓縮以節省儲存空間和網路傳輸開銷。對於大多數Hadoop輸出格式來說,我們可以指定一種壓縮編解碼器來壓縮資料。
  選擇一個輸出壓縮編解碼器可能會對這些資料以後的使用者產生巨大影響。對於像Spark 這樣的分散式系統,我們通常會嘗試從多個不同機器上一起讀入資料。要實現這種情況,每個工作節點都必須能夠找到一條新記錄的開端。有些壓縮格式會使這變得不可能,而必須要單個節點來讀入所有資料,這就很容易產生效能瓶頸。可以很容易地從多個節點上並行讀取的格式被稱為“可分割”的格式。下表列出了可用的壓縮選項。

格式 可分割 平均壓縮速度 文字檔案壓縮效率 Hadoop壓縮編解碼器 純Java實現 原生 備註
gzip org.apache.hadoop.io.compress.GzipCodec
lzo 是(取決於所使用的庫) 非常快 中等 com.hadoop.compression.lzo.LzoCodec 需要在每個節點上安裝LZO
bzip2 非常高 org.apache.hadoop.io.compress.Bzip2Codec 為可分割版本使用純Java
zlib 中等 org.apache.hadoop.io.compress.DefaultCodec Hadoop 的預設壓縮編解碼器
Snappy 非常快 org.apache.hadoop.io.compress.SnappyCodec Snappy 有純Java的移植版,但是在Spark/Hadoop中不能用

  儘管Spark 的textFile() 方法可以處理壓縮過的輸入,但即使輸入資料被以可分割讀取的方式壓縮,Spark 也不會開啟splittable。因此,如果你要讀取單個壓縮過的輸入,最好不要考慮使用Spark 的封裝,而是使用newAPIHadoopFile 或者hadoopFile,並指定正確的壓縮編解碼器。

關於上面一段話的個人測試:選取一個大檔案txt,大小為1.5G,寫spark程式讀取hdfs上的該檔案然後寫入hive,經測試在多個分割槽的情況下,txt執行時間最短,因為在多個機器並行執行,而gz檔案是不可分割的,即使指定分割槽數目,但依然是一個分割槽,一個task,即在一個機器上執行,bzip2格式的檔案雖然是可分割的,即可以按照指定的分割槽分為不同的task在多個機器上執行,但是執行時間長,比gz時間還長,經過四次改變bzip2的分割槽,發現最快的時間和gz時間是一樣的,如果指定一個分割槽的話,比gz要慢很多,我想這樣就可以更好的理解:”儘管Spark 的textFile() 方法可以處理壓縮過的輸入,但即使輸入資料被以可分割讀取的方式壓縮,Spark 也不會開啟splittable”這句話了。


後續測試:根據叢集的cpu合理分配executor的個數的情況下,txt的時間縮短到1分鐘,bzip2縮短到1.3分鐘,而對gz重新分割槽(reparation)縮短到2分鐘,可以看到在合理分配資源的情況下,bzip2比gz快不少,但依然趕不上txt,當然這也的結果可能受檔案大小和叢集資源的限制,所以根據自己的實際需求測試再決定用哪個即可。

2、程式碼

程式碼很簡單,用textFile()即可,假設,我的資料名為data.txt.gz,我把它放在hdfs上的/tmp/dkl路徑下那麼程式碼為:

val path = "hdfs://ambari.master.com:8020/tmp/dkl/data.txt.gz"
val data = sc.textFile(path)

注:把資料放在hdfs的命令為

hadoop fs -put data.tar.gz /tml/dkl

3、一些小問題

3.1 資料

首先造幾個資料吧,先建立一個txt,名字為data.txt,內容如下

1            張三            上海        2018-05-25
2            張三            上海        2018-05-25
3            張三            上海        2018-05-25
4            張三            上海        2018-05-25
5            張三            上海        2018-05-25

3.2 如何壓縮

那麼如如何打包為gz格式的壓縮檔案呢,分兩種
一、 在windows上打包,如果不想在Linux伺服器上用命令打包,那麼可以直接用windows上的軟體打包(win上常見的zip,rar格式,spark是不支援的),我用7-zip軟體壓縮,大家可百度7-zip或直接在https://www.7-zip.org/下載安裝,壓縮格式選gzip即可。
二、 在Linux上壓縮,可通過下面的命令
1、保留原檔案

gzip –c data.txt > data.txt.gz

2、不保留原檔案,預設生成的檔名為原檔名.gz,即data.txt.gz

gzip data.txt

壓縮完了之後,跑一下程式測試一下

data.take(3).foreach(println)
1            張三            上海        2018-05-25
2            張三            上海        2018-05-25
3            張三            上海        2018-05-25

根據結果看沒問題。
三、 說明
在Linux上用tar命令壓縮,spark雖然可以讀,但是第一行會有檔案資訊

tar -zcvf data.tar.gz data.txt

3.3 檔案編碼問題

別人給我的原檔案是.rar,那我需要將其解壓之後得到txt,然後按照上述方式壓縮為.gz,然後上傳到hdfs,進行程式碼測試,列印前幾條發現亂碼,查了一下發現原檔案是gbk編碼的,且sc.textFile()不能指定編碼,只能讀取utf8格式,其他格式就會亂碼。

注意:因為實際情況下解壓後的txt檔案很大,windows是直接打不開的,所以不能通過開啟檔案修改編碼的方法去解決。

3.3.1 構建測試gbk格式的檔案

1、windows上可以用記事本開啟,另存為,編碼選擇ANSI即可

2、Linux可以通過下面的命令修改

iconv -f utf8 -t gbk data.txt > data_gbk.txt

測試一下輸出,發現確實亂碼了(直接測試txt即可)

1            ����            �Ϻ�        2018-05-25
2            ����            �Ϻ�        2018-05-25
3            ����            �Ϻ�        2018-05-25

3.3.2 程式碼解決

通過如下程式碼測試即可
定義方法

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.Text
def transfer(sc: SparkContext, path: String): RDD[String] = {
  sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
    .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
}

測試方法

transfer(sc, path3).take(3).foreach(println)

3.3.3 Linux命令

可直接通過Linux命令轉換txt的編碼格式,再壓縮,這樣程式碼就不用修改
其實在3.2.1中已經涉及到了
1、通過Linux自帶的命令iconv
iconv不能覆蓋原來的檔案,只能生成新的檔案之後,再通過mv命令去覆蓋

iconv -f gbk -t utf8 data_gbk.txt > data_new.txt

2、通過enca
enca可以直接覆蓋原來的檔案,這樣如果不想改變來的檔名,就少一步mv操作了,enca不是子系統自帶的,需要自己下載安裝,可在http://dl.cihar.com/enca/下載最新版本。

#下載&解壓
wget http://dl.cihar.com/enca/enca-1.19.tar.gz
tar -zxvf enca-1.19.tar.gz
cd enca-1.19
#編譯安裝
./configure
make
make install

安裝好了之後通過下面的命令轉換即可

enca -L zh_CN -x UTF-8 data_gbk.txt 

轉換編碼格式之後,在通過程式測試即可。

3.4 rdd換df

由於檔案過大,不能直接開啟看也沒用垃圾資料,造成格式問題,如果有垃圾資料,在rdd轉df的過程中會產生異常,這裡記錄一下我碰見的問題。

1、首先可以先打印出前幾行資料檢視一下該檔案的大體格式

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true])....

原因是因為檔案裡有一行資料為垃圾資料,這行資料的列數和列名的個數不一樣導致的,可以在程式碼中過濾掉這樣資料即可。

.filter(_.length == colName.length)