最新版Spark2.2讀取多種檔案格式資料
Spark2.0+的檔案讀取
Spark可以讀取多種格式檔案,csv,json,parque。因此對應就有很多函式與之對應。在Spark2.0以後一般使用SparkSession來操作DataFrame、Dataset來完成資料分析。這些讀取不同格式檔案的函式就是SparkSession的成員DataFrameReader的方法。該類就是將檔案系統(HDFS,LocalFileSystem(一定要在每臺機器上都有的檔案,不然會找不到檔案,因為不確定executor會在哪臺機器上執行,如果是本地檔案,執行executor機器上一定要有該檔案))中的檔案讀取到Spark中,生成DataFrame的類。下面來看看具體的檔案讀取。
1.CSV
其實該方法叫CSV不是很好,因為它不止可以讀CSV檔案,他可以讀取一類由分隔符分割資料的檔案,由於這類檔案中CSV是代表,所以該函式才叫CSV吧。
1.1標準CSV
csv資料
特徵:有空值?表示,有表頭,型別明確
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
41264,44629,1,?,1,?,1,1,1,1,1,TRUE
28871,41775 ,1,?,1,?,1,1,1,1,1,TRUE
99344,99345,1,?,1,?,1,1,1,1,1,TRUE
31193,66985,1,?,1,?,1,1,1,1,0,TRUE
24429,25831,1,?,1,?,1,1,1,1,1,TRUE
23571,49029,1,?,1,?,1,1,1,1,1,TRUE
6884,6885,1,?,1,?,1,1,1,1,1,TRUE
7144,9338,1,?,1,?,1,1,1,1,1,TRUE
spark程式碼:
val spark = SparkSession.builder().appName("fileRead").getOrCreate()
import spark.implicits._
val data1 = spark.read
// 推斷資料型別
.option("inferSchema", true)
// 設定空值
.option("nullValue", "?")
// 表示有表頭,若沒有則為false
.option("header", true)
// 檔案路徑
.csv("ds/block_10.csv")
// 快取
.cache()
// 列印資料格式
data1.printSchema()
// 顯示資料,false引數為不要把資料截斷
data1.show(false)
效果:
1.2TSV
TSV資料
特徵:無頭,有資料型別,\t分割
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
298 474 4 884182806
115 265 2 881171488
253 465 5 891628467
305 451 3 886324817
6 86 3 883603013
spark程式碼:
val cols = Seq("user_id", "item_id", "rating", "timestamp")
val data2 = spark.read
// 推斷資料型別
.option("inferSchema", true)
// 沒有表頭false
.option("header", false)
// 指定分隔符
.option("delimiter", "\t")
.csv("movie/u.data")
// 設定頭
.toDF(cols: _*)
.cache()
data2.printSchema()
data2.show()
// 計數
data2.count()
結果:
2.JSON檔案
JSON不像CSV那樣,他是半結構化的資料,因此他可以表示更加複雜的資料型別,但是缺點也同樣明顯,儲存同樣的資料,JSON檔案更大。
資料:有點複雜,介紹一下
軌跡ID long,使用者ID long,time timestamp,td string,trail [id int ,ts long,alt double,lon double ,alt double,d string]
主要就是大物件裡面有一個數組,數組裡面有很多小物件(數量不固定),csv是難以表示這種資料的。
但是。。。處理起來很簡單
val jsonpath = "/home/wmx/hive/warehouse/trail/sample40.json"
val data3 = spark.read.json(jsonpath).cache()
data3.printSchema()
// 因為有點多隻顯示1條,不截斷
data3.show(1,false)
結果
這裡大家可以看到,時間戳資料被解析成string了,但是spark內建的資料型別是支援Date的
因此要處理資料型別:
改為
// 按順序把型別全寫下來
val schema: StructType = StructType(Seq(
StructField("tid", IntegerType, true),
StructField("uid", IntegerType, true),
StructField("st", TimestampType, true),
StructField("td", StringType, true),
StructField("trail", ArrayType(StructType(Seq(
StructField("id", IntegerType, true),
StructField("ts", LongType, true),
StructField("lat", DoubleType, true),
StructField("alt", DoubleType, true),
StructField("lon", DoubleType, true),
StructField("d", StringType, true)))), true)));
val data4 = spark.read
.schema(schema)
.json(jsonpath)
.cache()
data4.printSchema()
data4.show(1, true)
結果:
型別完全匹配:
最後準備寫一下parquet,但是parquet本人不是很熟,只知道parquet使用的函式是load(path:String),希望對大家有所幫助。
列式儲存
列式儲存和行式儲存相比有哪些優勢呢?
可以跳過不符合條件的資料,只讀取需要的資料,降低IO資料量。
壓縮編碼可以降低磁碟儲存空間。由於同一列的資料型別是一樣的,可以使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約儲存空間。
只讀取需要的列,支援向量運算,能夠獲取更好的掃描效能。