1. 程式人生 > >如何在Scala中讀取Hadoop叢集上的gz壓縮檔案

如何在Scala中讀取Hadoop叢集上的gz壓縮檔案

存在Hadoop叢集上的檔案,大部分都會經過壓縮,如果是壓縮後的檔案,我們直接在應用程式中如何讀取裡面的資料?答案是肯定的,但是比普通的文字讀取要稍微複雜一點,需要使用到Hadoop的壓縮工具類支援,比如處理gz,snappy,lzo,bz壓縮的,前提是首先我們的Hadoop叢集得支援上面提到的各種壓縮檔案。

本次就給出一個讀取gz壓縮檔案的例子核心程式碼:

def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={
  
  //訪問hdfs檔案,只讀取gz結尾的壓縮檔案,如果是.tmp結尾的不會讀取
val path=new Path("/collect_data/userlog/"+date+"/log*.gz") //例項化壓縮工廠編碼類 val factory = new CompressionCodecFactory(conf) //讀取通配路徑 val items=fs.globStatus(path) var count=0 //遍歷每一個路徑檔案 items.foreach(f=>{ //列印全路徑 println(f.getPath) //通過全路徑獲取其編碼 val codec = factory.getCodec(f.getPath())//獲取編碼
//讀取成資料流 var stream:InputStream = null; if(codec!=null){ //如果編碼識別直接從編碼建立輸入流 stream = codec.createInputStream(fs.open(f.getPath())) }else{ //如果不識別則直接開啟 stream = fs.open(f.getPath()) } val writer=new StringWriter() //將位元組流轉成字串流 IOUtils.copy(stream,writer,"UTF-8"
) //得到字串內容 val raw=writer.toString //根據字串內容split出所有的行資料,至此解壓資料完畢 val raw_array=raw.split("\n") //遍歷資料 raw_array.foreach(line=>{ val array = line.split("--",2) //拆分陣列 val map = JSON.parseObject(array(1)).asScala val userId = map.get("userId").getOrElse("").asInstanceOf[String] //為空為非法資料 val time = map.get("time").getOrElse("") //為空為非法資料 if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有資料 pushToKafka(topic,userId,line) count=count+1 } }) }) }

壓縮和解壓模組用的工具包是apache-commons下面的類:

import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils

如果想在Windows上除錯,可以直接設定HDFS的地址即可

-     val conf = new Configuration()//獲取hadoop的conf
//    conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上除錯用

至此資料已經解壓並讀取完畢,其實並不是很複雜,用java程式碼和上面的程式碼也差不多類似,如果直接用原生的api讀取會稍微複雜,但如果我們使用Hive,Spark框架的時候,框架內部會自動幫我們完成壓縮檔案的讀取或者寫入,對使用者透明,當然底層也是封裝了不同壓縮格式的讀取和寫入程式碼,這樣以來使用者將會方便許多。

參考文章:

有什麼問題可以掃碼關注微信公眾號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。

輸入圖片說明