如何在Scala中讀取Hadoop叢集上的gz壓縮檔案
阿新 • • 發佈:2019-01-08
存在Hadoop叢集上的檔案,大部分都會經過壓縮,如果是壓縮後的檔案,我們直接在應用程式中如何讀取裡面的資料?答案是肯定的,但是比普通的文字讀取要稍微複雜一點,需要使用到Hadoop的壓縮工具類支援,比如處理gz,snappy,lzo,bz壓縮的,前提是首先我們的Hadoop叢集得支援上面提到的各種壓縮檔案。
本次就給出一個讀取gz壓縮檔案的例子核心程式碼:
def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={
//訪問hdfs檔案,只讀取gz結尾的壓縮檔案,如果是.tmp結尾的不會讀取
val path=new Path("/collect_data/userlog/"+date+"/log*.gz")
//例項化壓縮工廠編碼類
val factory = new CompressionCodecFactory(conf)
//讀取通配路徑
val items=fs.globStatus(path)
var count=0
//遍歷每一個路徑檔案
items.foreach(f=>{
//列印全路徑
println(f.getPath)
//通過全路徑獲取其編碼
val codec = factory.getCodec(f.getPath())//獲取編碼
//讀取成資料流
var stream:InputStream = null;
if(codec!=null){
//如果編碼識別直接從編碼建立輸入流
stream = codec.createInputStream(fs.open(f.getPath()))
}else{
//如果不識別則直接開啟
stream = fs.open(f.getPath())
}
val writer=new StringWriter()
//將位元組流轉成字串流
IOUtils.copy(stream,writer,"UTF-8" )
//得到字串內容
val raw=writer.toString
//根據字串內容split出所有的行資料,至此解壓資料完畢
val raw_array=raw.split("\n")
//遍歷資料
raw_array.foreach(line=>{
val array = line.split("--",2) //拆分陣列
val map = JSON.parseObject(array(1)).asScala
val userId = map.get("userId").getOrElse("").asInstanceOf[String] //為空為非法資料
val time = map.get("time").getOrElse("") //為空為非法資料
if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有資料
pushToKafka(topic,userId,line)
count=count+1
}
})
})
}
壓縮和解壓模組用的工具包是apache-commons下面的類:
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
如果想在Windows上除錯,可以直接設定HDFS的地址即可
- val conf = new Configuration()//獲取hadoop的conf
// conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上除錯用
至此資料已經解壓並讀取完畢,其實並不是很複雜,用java程式碼和上面的程式碼也差不多類似,如果直接用原生的api讀取會稍微複雜,但如果我們使用Hive,Spark框架的時候,框架內部會自動幫我們完成壓縮檔案的讀取或者寫入,對使用者透明,當然底層也是封裝了不同壓縮格式的讀取和寫入程式碼,這樣以來使用者將會方便許多。
參考文章:
有什麼問題可以掃碼關注微信公眾號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。