1. 程式人生 > 其它 >Spark SQL應用

Spark SQL應用

技術標籤:Sparkspark大資料hdfshadoop

Spark SQL應用

實驗目的

深入理解和掌握DataFrame各種常見操作和程式設計方法;掌握使用Spark SQL程式設計解決實際問題的方法。

實驗要求

  1. 掌握基於Maven的Scala和Spark SQL程式設計環境配置;
  2. 掌握DataFrame查詢方法。

實驗內容

  1. 將實驗二中的Online Retail.csv上傳至HDFS
  2. 在Maven中配置Spark SQL程式設計環境,pom.xml中新增:
<dependency>
	<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId> <version>3.0.1</version> </dependency>
若自己安裝的Spark不是3.0.1,則自己搜尋適合的spark-sql版本
  1. 編寫程式碼將csv檔案讀取至DataFrame,將Schema設定如下:
    表1 Schema設定
Column Name	Type	Nullable
InvoiceNo	StringType	false
StockCode	StringType	false
Description	StringType	false
Quantity IntegerType false InvoiceDate DateType false UnitPrice DecimalType false CustomerID StringType false Country StringType false
val conf = new SparkConf().setAppName(“Spark SQL”).setMaster(“local[*])
val sc = new SparkContext(conf)
val spark = SparkSession.builder().master(“local[*]).appName(
“Spark SQL”).getOrCreate() val df = spark.read.format(“com.databricks.spark.csv”) .option(“header”,true) .option(“mode”, “DROPMALFORMED”) .load(“hdfs://主機名或ip地址:埠號/檔案路徑”) val df_null = df.na.drop() val intRegx =^\d+$”.r val timeRegx =^\d{1,2}/\d{1,2}/\d{4} \d{1,2}:\d{1,2}$”.r val doubleRegx =^\d+(\.\d+)?$”.r val timeFormat = new SimpleDateFormat(“M/d/yyyy H:m”) val rdd = df_null.rdd.map(x => (x.getString(0), x.getString(1), x.getString(2), x.getString(3), x.getString(4), x.getString(5), x.getString(6), x.getString(7))) .filter(x => !intRegx.findFirstIn(x._4).isEmpty && !timeRegx.findFirstIn(x._5).isEmpty && //以日期規則過濾InvoiceDate列 !doubleRegx.findFirstIn(x._6).isEmpty) //以浮點型規則過濾UnitPrice列 .map(x => (x._1, x._2, x._3, x._4.toInt, new java.sql.Date(timeFormat.parse(x._5).getTime), x._6.toDouble, x._7, x._8)) val schema = StructType(Array( StructField(“InvoiceNo”, StringType, false), StructField(“StockCode”, StringType, false), StructField(“Description”, StringType, false), StructField(“Quantity”, IntegerType, false), StructField(“InvoiceDate”, DateType, false), StructField(“UnitPrice”, DoubleType, false), StructField(“CustomerID”, StringType, false), StructField(“Country”, StringType, false) )) val df_final = spark.createDataFrame(rdd.map(x => Row.fromTuple(x)), schema) df_final.write.json(“hdfs://主機名或ip地址:埠號/檔案路徑”)

在這裡插入圖片描述

(1) 要求:所有欄位不能為空,可將所有包含空值行或無法做資料型別轉換的行視為無效行去掉。

val rdd = df_null.rdd.map(x => (x.getString(0), x.getString(1), x.getString(2), x.getString(3), x.getString(4), x.getString(5), x.getString(6), x.getString(7)))
.filter(x => !intRegx.findFirstIn(x._4).isEmpty &&  
 !timeRegx.findFirstIn(x._5).isEmpty && //以日期規則過濾InvoiceDate列  
 !doubleRegx.findFirstIn(x._6).isEmpty) //以浮點型規則過濾UnitPrice列  
 .map(x => (x._1, x._2, x._3, x._4.toInt, new java.sql.Date(timeFormat.parse(x._5).getTime), x._6.toDouble, x._7, x._8)) 

(2) 讀取檔案方法:使用Spark SQL讀取csv的方法,直接按列讀取為DataFrame,然後再取RDD;或直接將csv以檔案的方式讀如RDD,然後以逗號將各列split為陣列,但注意欄位中本身包含逗號以及欄位兩端有雙引號的情況,可用正則表示式識別各列內容

(3) 資料轉換方法:使用RDD做資料轉換並去除空行,然後建立Schema,將RDD按建立的Schema轉為DataFrame

val df_final = spark.createDataFrame(rdd.map(x => Row.fromTuple(x)), schema)
  1. 在程式中將轉換好的DataFrame儲存為JSON

  2. 進入spark-shell
    (1) 將第4步儲存的檔案載入到DataFrame

val df = spark.read.json("hdfs://主機名或ip地址:埠號/檔名")

(2) 執行DataFrame中的查詢(以下查詢分別用轉換操作運算元和SQL語句實現),並用show命令打印出摘要資訊
① 查詢單價小於0.2的所有商品
先建立一個檢視,用作sql語句的查詢

val table = df.createTempView("data")  

Sql

spark.sql("select Description, UnitPrice from data where UnitPrice<0.2").show()  

運算元

df.selectExpr("Description", "UnitPrice").where("UnitPrice<0.2").show() //單價小於0.2  

在這裡插入圖片描述

② 查詢訂單551845~551850的顧客
Sql

spark.sql("select CustomerID, InvoiceNo from data where InvoiceNo>=55184 and InvoiceNo<=551850").show()  

運算元

df.selectExpr("CustomerID", "InvoiceNo").where("InvoiceNo>=55184 and InvoiceNo<=551850").show() //查詢訂單551845~551850的顧客  

在這裡插入圖片描述

③ 統計本資料中包含了多少個訂單
Sql

spark.sql("select count(distinct InvoiceNo) from data").show()  

運算元

df.select(countDistinct("InvoiceNo")).show()  

在這裡插入圖片描述

④ 統計所有訂單的最大金額、訂單包含的最多產品數量、訂單包含的最多產品種類
訂單最大金額
Sql

spark.sql("select sum(Quantity*UnitPrice) as sumPrice from data group by InvoiceNo order by sum(Quantity*UnitPrice) desc").show()   

運算元

df.selectExpr("max(Quantity*UnitPrice)").show()  

在這裡插入圖片描述

最多產品數量
Sql

spark.sql("select StockCode, max(Quantity) as maxNum from data group by StockCode order by max(Quantity) desc").show()

運算元

df.groupBy("StockCode").agg(sum("Quantity") as "maxNum").orderBy(desc("maxNum")).show()

在這裡插入圖片描述

最多產品總類
Sql

spark.sql("select StockCode, count(StockCode) from data group by StockCode order by count(StockCode) desc").show()  

運算元

df.groupBy("StockCode").count().orderBy(desc("count")).show()  

在這裡插入圖片描述