Spark開發-SparkSQL讀寫資料
阿新 • • 發佈:2021-01-08
SparkSQL資料讀寫
DataFrameReader DataFrameWriter DataFrameReader 對應的元件 SCHEMA OPTION FORMAT DataFrameReader 有兩種訪問方式, 一種是使用 load 方法載入, 使用 format 指定載入格式, 還有一種是使用封裝方法, 類似 csv, json, jdbc 等 //.第一種形式 READ + FORMAT +load讀取 spark.read .format("csv") .option("header",value=true) .option("inferSchema",value = true) .load("dataset/1231.csv") //2.第二種形式 使用具體檔案型別讀取 READ spark.read .option("header",value=true) .option("inferSchema",value = true) .csv("dataset/1231.csv") DataFrameWriter 也有兩種使用方式, 一種是使用 format 配合 save, 預設的 format 是 Parquet 還有一種是使用封裝方法, 例如 csv, json, saveAsTable 等 def parquet(path: String): Unit = { format("parquet").save(path)} def csv (path: String): Unit = { format("csv").save(path)}
DataFrameWriter
增量操作
使用spark做增量操作的時候, insertInto 和 mode(SaveMode.Append).saveAsTable() insertInto insertInto 使用基於位置 It requires that the schema of the `DataFrame` is the same as the schema of the table. Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based resolution saveAsTable 基於列名 column_names-based 有不同的模式, 如果是 SaveMode.Overwrite the schema of the `DataFrame` does not need to be the same as that of the existing table. 如果是 `Append`, if there is an existing table, we will use the format and options OF the existing table. The column order in the schema of the `DataFrame` doesn't need to be same as that of the existing table. Unlike `insertInto`, `saveAsTable` will use the column names to find the correct column positions ####覆蓋 report_info.write.insertInto(tableName="dwd.t_dwd_report_info_d",overwrite=True) DataFrameWriter 可以將資料儲存到 Hive 表中, 所以也可以指定分割槽和分桶資訊 對儲存的 SCHEMA 檔案的覆蓋和追加
Spark的讀寫
Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat. 寫 saveAsTextFile saveAsSequenceFile saveAsSequenceFile
儲存問題
儲存的載體
儲存的資料格式
儲存所使用的命令或函式
存數遇到的問題
小檔案問題
spark.default.parallelism 在處理RDD時才會起作用,對SparkSql無效。
spark.sql.shuffle.partitions 則是對sparks SQL專用的設定。
原始碼
org.apache.spark.sql
Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,key-value stores, etc).
Use `Dataset.write` to access this.
使用了 // Builder pattern config options 構建者模式
參考
https://github.com/apache/spark/pull/13013
Spark寫入hive表時saveAsTable和insertInto的區別 https://blog.csdn.net/huihuixia123/article/details/107658299
Spark(三)-- SparkSQL擴充套件(資料讀寫) -- 讀寫 Parquet、Json 格式檔案(二) https://blog.csdn.net/qq_18800463/article/details/101421490
Spark SQL, DataFrames and Datasets Guide http://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
RDD Programming Guide http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide