SparkSQL:Parquet資料來源之合併元資料
阿新 • • 發佈:2018-12-15
合併元資料
如同ProtocolBuffer,Avro,Thrift一樣,Parquet也是支援元資料合併的。使用者可以在一開始就定義一個簡單的元資料,然後隨著業務需要,逐漸往元資料中新增更多的列。在這種情況下,使用者可能會建立多個Parquet檔案,有著多個不同的但是卻互相相容的元資料。Parquet資料來源支援自動推斷出這種情況,並且進行多個Parquet檔案的元資料的合併。
因為元資料合併是一種相對耗時的操作,而且在大多數情況下不是一種必要的特性,從Spark 1.5.0版本開始,預設是關閉Parquet檔案的自動合併元資料的特性的。可以通過以下兩種方式開啟Parquet資料來源的自動合併元資料的特性:
1、讀取Parquet檔案時,將資料來源的選項,mergeSchema,設定為true
2、使用SQLContext.setConf()方法,將spark.sql.parquet.mergeSchema引數設定為true
案例:合併學生的基本資訊,和成績資訊的元資料
程式碼例項
Scala版本:
package com.etc import org.apache.spark.sql.{SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext} /** * author: fengze * description: * Parquet資料來源之合併元資料 */ object ParquetMergeSchema { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ParquetMergeSchema") .setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //不用提前建立parquet檔案 // 建立一個DataFrame,作為學生的基本資訊,並寫入一個parquet檔案中 val studentsWithNameAge = Array(("leo", 23), ("jack", 25)).toSeq val studentsWithNameAgeDF = sc.parallelize(studentsWithNameAge, 2).toDF("name", "age") studentsWithNameAgeDF.save("F:\\Spark-SQL\\students.parquet", "parquet", SaveMode.Append) // 建立第二個DataFrame,作為學生的成績資訊,並寫入一個parquet檔案中 val studentsWithNameGrade = Array(("marry", "A"), ("tom", "B")).toSeq val studentsWithNameGradeDF = sc.parallelize(studentsWithNameGrade, 2).toDF("name", "grade") studentsWithNameGradeDF.save("F:\\Spark-SQL\\students.parquet", "parquet", SaveMode.Append) // 首先,第一個DataFrame和第二個DataFrame的元資料肯定是不一樣的吧 // 一個是包含了name和age兩個列,一個是包含了name和grade兩個列 // 所以, 這裡期望的是,讀取出來的表資料,自動合併兩個檔案的元資料,出現三個列,name、age、grade // 用mergeSchema的方式,讀取students表中的資料,進行元資料的合併 //重點:sqlContext.read.option("mergeSchema", "true") val students = sqlContext.read.option("mergeSchema", "true") .parquet("F:\\Spark-SQL\\students.parquet") //列印元資料結構資訊 students.printSchema() students.show() } }
java版本:
package com.etc; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import java.sql.Array; /** * @author: fengze * @description: * Parquet資料來源之合併元資料 */ public class ParquetMergeSchemaJava { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("ParquetMergeSchemaJava") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); //建立第一個studentInfo1基本資訊,寫入一個parquet檔案中 DataFrame studentInfo1 = sqlContext.read().format("json").load("F:\\studentInfo1.json"); studentInfo1.write().mode(SaveMode.Append).save("F:\\studentInfo.parquet"); //建立第兩個studentInfo1基本資訊,寫入一個parquet檔案中 DataFrame studentInfo2 = sqlContext.read().format("json").load("F:\\studentInfo2.json"); studentInfo2.write().mode(SaveMode.Append).save("F:\\studentInfo.parquet"); //開啟Parquet資料來源的自動合併元資料的特性 //重點:sqlContext.read.option("mergeSchema", "true") DataFrame studentInfo = sqlContext.read().option("mergeSchema", "true") .parquet("F:\\studentInfo.parquet"); //列印元資料結構資訊 studentInfo.printSchema(); studentInfo.show(); } }