Spark建立DataFrame的三種方法
阿新 • • 發佈:2018-12-24
跟關係資料庫的表(Table)一樣,DataFrame是Spark中對帶模式(schema)行列資料的抽象。DateFrame廣泛應用於使用SQL處理大資料的各種場景。建立DataFrame有很多種方法,比如從本地List建立、從RDD建立或者從源資料建立,下面簡要介紹建立DataFrame的三種方法。
方法一,Spark中使用toDF
函式建立DataFrame
通過匯入(importing)Spark sql implicits, 就可以將本地序列(seq), 陣列或者RDD轉為DataFrame。只要這些資料的內容能指定資料型別即可。
本地seq + toDF建立DataFrame示例:
import sqlContext.implicits._
val df = Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
注意:如果直接用toDF()而不指定列名字,那麼預設列名為"_1", "_2", ...
通過case class + toDF建立DataFrame的示例
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // 使用 sqlContext 執行 sql 語句. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // 注:sql()函式的執行結果也是DataFrame,支援各種常用的RDD操作. // The columns of a row in the result can be accessed by ordinal. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
方法二,Spark中使用createDataFrame
函式建立DataFrame
在SqlContext
中使用createDataFrame也可以建立DataFrame。跟toDF
一樣,這裡建立DataFrame的資料形態也可以是本地陣列或者RDD。
通過row+schema建立示例
import org.apache.spark.sql.types._ val schema = StructType(List( StructField("integer_column", IntegerType, nullable = false), StructField("string_column", StringType, nullable = true), StructField("date_column", DateType, nullable = true) )) val rdd = sc.parallelize(Seq( Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")), Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01")) )) val df = sqlContext.createDataFrame(rdd, schema)
方法三,通過檔案直接建立DataFrame
使用parquet檔案建立
val df = sqlContext.read.parquet("hdfs:/path/to/file")
使用json檔案建立
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
使用csv檔案,spark2.0+之後的版本可用
//首先初始化一個SparkSession物件
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
//然後使用SparkSessions物件載入CSV成為DataFrame
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api
df.show()
補充:spark資料集的演變: