1. 程式人生 > >Spark SQL入門基礎

Spark SQL入門基礎

Spark SQL簡介

從Shark說起

Shark即hive on Spark,為了實現與Hive相容,Shark在HiveQL方面重用了Hive中HiveQL的解析、邏輯執行計劃、翻譯執行計劃優化等邏輯,可以近似認為將物理執行計劃從MapReduce作業替換成了Spark作業,通過Hive的HiveQL解析,把HiveQL翻譯成Spark上的RDD操作。Shark的出現,使得SQL-on-Hadoop的效能比Hive有了10-100倍的提高。

Shark的設計導致了兩個問題:

  • 執行計劃優化完全依賴於Hive,不方便新增新的優化策略。

  • 因為Shark是執行緒級並行,而MapReduce是程序級。因此,Spark在相容Hive的實現上存線上程安全問題,導致Shark不得不使用另外一套獨立維護的打了補丁的Hive原始碼分支。

2014年6月1日Shark專案和SparkSQL專案的主持人Reynold Xin宣佈:停止對Shark的開發,團隊將所有資源放在SparkSQL專案上。至此,Shark的發展畫上了句號,也因此發展出了兩個方向:SparkSQL和Hive on Spark

  • Spark SQL作為Spark生態的一員繼續發展,而不再受限於Hive,只是相容Hive。

  • Hive on Spark是一個Hive的發展計劃,該計劃將Spark作為Hive底層引擎之一,也就是說,Hive將不再受限於一個引擎,可以採用Map-Reduce、Tez、Spark等引擎。

Spark SQL設計

mark
Spark SQL在Hive相容層面僅依賴HiveQL解析、Hive元資料,也就是說,從HQL被解析成抽象語法樹起,就全部由Spark SQL接管了。Spark SQL執行計劃生成和優化都由Catalyst(函式式關係查詢優化框架)負責。

Spark SQL增加了DataFrame(即帶有Schema資訊的RDD),使使用者可以在Spark SQL中執行SQL語句,資料既可以來自RDD,也可以是Hive、HDFS、Cassandra等外部資料來源,還可以是JSON格式的資料。Spark SQL提供DataFrame API,可以對內部和外部各種資料來源執行各種關係操作。

Spark SQL可以支援大量的資料來源和資料分析演算法。Spark SQL可以融合傳統關係資料庫的結構化資料管理能力和機器學習演算法的資料處理能力。

DataFrame

DataFrame使得Spark具備了對大規模結構化資料的處理能力,不僅比原有的RDD轉化方式更加簡單易用,而且獲得了更高的計算能力。

  • RDD是分散式的Java物件的集合,但是,物件內部結構對於RDD而言卻是不可知的。

  • DataFrame是一種以RDD為基礎的分散式資料集,提供了詳細的結構資訊。
    Spark能夠輕鬆實現從MySQL到DataFrame的轉化,並且支援SQL查詢。
    RDD分散式物件的集合。

DataFrame的建立

從Spark2.0開始,Spark使用全新的SparkSession介面替代Spark1.6中的SQLContext及HiveContext介面來實現對資料載入、轉換、處理等功能。SparkSession實現了SQLContext及HiveContext所有功能。

Spark支援從不同的資料來源載入資料,並把資料轉換成DataFrame,並且支援把DataFrame轉換成SQLContext自身中的表,然後使用SQL語句來操作資料。SparkSession亦提供了HiveQL以及其他依賴於Hive的功能的支援。

現在在”/usr/local/spark/examples/src/main/resources/”這個目錄下有兩個樣例資料people.json和people.txt。

people.json檔案的內容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

people.txt檔案的內容如下:
Michael,29
Andy,30
Justin,19

編寫程式碼讀取檔案資料,建立DataFrame。

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
//是支援RDDs轉換為DataFrames及後續sql操作
import spark.implictis._
val df = spark.read.json("file://usr/local/spark/examples/src/main/resources/people.json")
df.show()
//列印模式資訊
df.printSchema()
df.select(df("name"), df("age")+1).show()
//分組聚合
df.groupBy("age").count().show()

從RDD到DataFrame

利用反射機制推斷RDD模式

在利用反射機制推斷RDD模式時,需要首先頂一個case class。因為,只有case class才能被Spark隱式地轉換為DataFrame。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //支援把一個RDD隱式轉換為一個DataFrame
import spark.implicits._
case class Person(name:String, age:Long) # 定義case class
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
    .map(_.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF()
peopleDF.createOrReplaceTempView("people") #必須註冊為臨時表才能供下面的查詢使用
//最終生成一個DataFrame
val personsRDD = spark.sql("select name, age from people where age>20")
//DataFrame中的麼個元素都是一行記錄,包含name和age兩個欄位,分別用t(0),t(1)來獲取值
personsRDD.map(t=>"Name:"+t(0)+","+"Age:"+t(1)).show()

使用程式設計方式定義RDD模式

當無法提前定義case clas時,就需要採用程式設計方式定義RDD模式。

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//定義一個模式字串
val schemaString="name age"
//根據模式字串生成模式
val fields = schemaString.split(" ").map(fieldName=>StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields) //模式中包含name和age兩個欄位
val rowRDD = peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("select name,age from people")
results.map(attributes=>"name"+attributes(0)+"."+"age:"+attributes(1)).show()

把DataFrame儲存成檔案

  • 第一個方法
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("csv")
    .save("file:///usr/local/spark/examples/src/main/resources/newpeople.csv")

write.format()支援輸出json,parquet,jdbc.orc,libsvm,csv,text等格式檔案。
- 第二種方式

val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
//轉換成rdd然後再儲存
peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

讀取和儲存parquet資料

Spark SQL可以支援Parquet、JSON、Hive等資料來源,並且可以通過JDBC連線外部資料來源。

Parquet是一種流行的列式儲存格式,可以高效地儲存具有巢狀欄位的記錄。Parquet是語言無關的,而且不與任何一種資料處理框架繫結在一起,適配多種語言和元件,能夠與Parquet配合使用的元件有:

  • 查詢引擎:Hive,Impala,Pig,Presto等
  • 計算框架:MapReduce,Spark,Cascading等
  • 資料模型:Avro,Thrift,Protocol Buffers, POJOs

從parquet檔案中載入資料生成DataFrame

import spark.implicits._
val parquetFileDF = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("select * from parquetFile")
namesDF.foreach(attributes=>println("Name:"+attributes(0)+"favorite color:"+attributes(1)))

將DataFrame儲存成parquet檔案

import spark.implicits._
val parquetFileDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")

讀取和插入MySQL

準備工作:

  • 下載MySQL的JDBC驅動,比如mysql-connector-java-5.1.40.tar.gz
  • 把該驅動程式拷貝到spark的安裝目錄“/usr/local/spark/jars”下
  • 啟動一個spark-shell,啟動Spark Shell時,必須指定mysql連線驅動jar包
$ ./bin/spark-shell --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar

在MySQL資料庫中建立了一個名稱為spark的資料庫,並建立了一個名稱為student的表。
執行以下命令連線資料庫,讀取資料,並顯示:

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark")
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student")
    .option("user", "root").option("password","hadoop").load()
jdbcDF.show()

向student表中插入兩條記錄:

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

//下面我們設定兩條資料表示兩個學生資訊
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26", "4 Guanhua M 27")).map(_.split(" "))

//下面要設定模式資訊
val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true), StructField("age", IntegerType, true)))

//下面建立Row物件,每個Row物件都是rowRDD中的一行
val rowRDD = studentRDD.map(p=>Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))

//建立起Row物件和模式之間的對應關係,也就是把資料和模式對應起來
val studentDF = spark.createDataFrame(rowRDD, schema)

//下面建立一個prop變數用來儲存jdbc連線引數
val prop = new Properties()
prop.put("user", "root") // 表示使用者名稱是root
prop.put("password", "hadoop") //表示密碼是hadoop
prop.put("driver","com.mysql.jdbc.Driver") //表示驅動程式

//連線資料庫,採用append模式,表示追加記錄到資料表中
studentDF.wirte.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "student", prop)