1. 程式人生 > >Spark SQL入門

Spark SQL入門

Spark SQL

將SQL語句轉為底層的Spark作業執行,支援大量資料分析演算法。

資料抽象DataFrame

載入資料來源生成結構化資料
DataFrame的建立

import org.apache.spark.sql.SparkSession
import spark.implicits._   //將底層資料來源隱式轉為DataFrame
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.json("file:///usr/local/spark.examples/src/main/resources/people.json")

DataFrame常用操作

df.show()        //顯示資料表
df.printSchema()		//列印模式
df.select(df("name"),df("age")).show()	//查詢
df.filter(df("age")>20).show() //過濾
df.groupBy(df("age"))	//分組聚合
df.sort(df("age"))  //升序
df.rdd.saveAsTextFile("....") //儲存為檔案

RDD轉換為DataFrame

反射機制推斷RDD模式
txt檔案
在這裡插入圖片描述

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.SparkSession
import spark.implicits._   //將底層資料來源隱式轉為DataFrame
case Person(name:String,age:Long)    //定義要轉換成的DF類
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark.examples/src/main/resources/people.json").map(line => line.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF
//spark是上段程式碼的SparkSession物件,載入檔案成為RDD,分割",",RDD是一個個Array,把每個Array生成物件Person,轉為DF
peopleDF.createOrReplaceTempView("people")  //註冊為臨時表
val resultDF = spark.sql(“select name,age from people where age >20”) //使用SQL語句進行查詢
resultDF.show()

結果如下:
在這裡插入圖片描述
使用程式設計方式定義RDD模式
無法提前 cass class的情況
三個步驟

  1. 製作“表頭”
  2. 製作“表的記錄”
  3. 拼裝表頭和記錄
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//製作欄位 StructField 第一個引數數字段名,第二個是型別,第三個是是否可null
val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
val schema = StructType(fields) //製作成模式
//載入資料來源成為RDD
...
//製作表中記錄 line =>Array => Row
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))
//拼裝
val peopleDF = spark.createDataFrame(rowRDD, schema)

通過JDBC連線資料庫

 //帶引數啟動spark-shell
./bin/spark-shell  --jars /usr/local/spark/jars/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar  --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.46-bin.jar
 //讀   
 val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()   
 
//寫
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
 
//下面我們設定兩條資料表示兩個學生資訊
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" ")) 
//下面要設定模式資訊
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
//下面建立Row物件,每個Row物件都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
//建立起Row物件和模式之間的對應關係,也就是把資料和模式對應起來
val studentDF = spark.createDataFrame(rowRDD, schema)
//下面建立一個prop變數用來儲存JDBC連線引數
val prop = new Properties()
prop.put("user", "root") //表示使用者名稱是root
prop.put("password", "hadoop") //表示密碼是hadoop
prop.put("driver","com.mysql.jdbc.Driver") //表示驅動程式是com.mysql.jdbc.Driver
//下面就可以連線資料庫,採用append模式,表示追加記錄到資料庫spark的student表中
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop)