1. 程式人生 > >SparkSQL:Parquet資料來源之合併元資料

SparkSQL:Parquet資料來源之合併元資料

合併元資料

如同ProtocolBuffer,Avro,Thrift一樣,Parquet也是支援元資料合併的。使用者可以在一開始就定義一個簡單的元資料,然後隨著業務需要,逐漸往元資料中新增更多的列。在這種情況下,使用者可能會建立多個Parquet檔案,有著多個不同的但是卻互相相容的元資料。Parquet資料來源支援自動推斷出這種情況,並且進行多個Parquet檔案的元資料的合併。

因為元資料合併是一種相對耗時的操作,而且在大多數情況下不是一種必要的特性,從Spark 1.5.0版本開始,預設是關閉Parquet檔案的自動合併元資料的特性的。可以通過以下兩種方式開啟Parquet資料來源的自動合併元資料的特性:

1、讀取Parquet檔案時,將資料來源的選項,mergeSchema,設定為true

2、使用SQLContext.setConf()方法,將spark.sql.parquet.mergeSchema引數設定為true

案例:合併學生的基本資訊,和成績資訊的元資料

程式碼例項

Scala版本:

package com.etc
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * author: fengze
  * description:
  * Parquet資料來源之合併元資料
  */
object ParquetMergeSchema {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("ParquetMergeSchema")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    //不用提前建立parquet檔案
    // 建立一個DataFrame,作為學生的基本資訊,並寫入一個parquet檔案中
    val studentsWithNameAge = Array(("leo", 23), ("jack", 25)).toSeq
    val studentsWithNameAgeDF = sc.parallelize(studentsWithNameAge, 2).toDF("name", "age")
    studentsWithNameAgeDF.save("F:\\Spark-SQL\\students.parquet", "parquet", SaveMode.Append)

    // 建立第二個DataFrame,作為學生的成績資訊,並寫入一個parquet檔案中
    val studentsWithNameGrade = Array(("marry", "A"), ("tom", "B")).toSeq
    val studentsWithNameGradeDF = sc.parallelize(studentsWithNameGrade, 2).toDF("name", "grade")
    studentsWithNameGradeDF.save("F:\\Spark-SQL\\students.parquet", "parquet", SaveMode.Append)

    // 首先,第一個DataFrame和第二個DataFrame的元資料肯定是不一樣的吧
    // 一個是包含了name和age兩個列,一個是包含了name和grade兩個列
    // 所以, 這裡期望的是,讀取出來的表資料,自動合併兩個檔案的元資料,出現三個列,name、age、grade

    // 用mergeSchema的方式,讀取students表中的資料,進行元資料的合併
    //重點:sqlContext.read.option("mergeSchema", "true")
    val students = sqlContext.read.option("mergeSchema", "true")
      .parquet("F:\\Spark-SQL\\students.parquet")

    //列印元資料結構資訊
    students.printSchema()

    students.show()
  }
}

java版本:

package com.etc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;

import java.sql.Array;

/**
 * @author: fengze
 * @description:
 * Parquet資料來源之合併元資料
 */
public class ParquetMergeSchemaJava {
    public static void main(String[] args) {
      SparkConf conf = new SparkConf()
              .setAppName("ParquetMergeSchemaJava")
              .setMaster("local");
      JavaSparkContext sc = new JavaSparkContext(conf);
      SQLContext sqlContext = new SQLContext(sc);

      //建立第一個studentInfo1基本資訊,寫入一個parquet檔案中
      DataFrame studentInfo1 = sqlContext.read().format("json").load("F:\\studentInfo1.json");
      studentInfo1.write().mode(SaveMode.Append).save("F:\\studentInfo.parquet");

      //建立第兩個studentInfo1基本資訊,寫入一個parquet檔案中
      DataFrame studentInfo2 = sqlContext.read().format("json").load("F:\\studentInfo2.json");
      studentInfo2.write().mode(SaveMode.Append).save("F:\\studentInfo.parquet");

      //開啟Parquet資料來源的自動合併元資料的特性
      //重點:sqlContext.read.option("mergeSchema", "true")
      DataFrame studentInfo = sqlContext.read().option("mergeSchema", "true")
              .parquet("F:\\studentInfo.parquet");

      //列印元資料結構資訊
      studentInfo.printSchema();

      studentInfo.show();
    }
}