2021-2022年寒假學習進度20
今天完成spark基礎實驗五
一、實驗目的
(1)通過實驗掌握SparkSQL的基本程式設計方法;
(2)熟悉RDD到DataFrame 的轉化方法;
(3)熟悉利用SparkSQL管理來自不同資料來源的資料。
二、實驗平臺
作業系統: Ubuntu16.04 Spark 版本:2.1.0
資料庫:MySQL
三、實驗內容和要求
1.SparkSQL基本操作
將下列JSON 格式資料複製到 Linux 系統中,並儲存命名為 employee.json。
為 employee.json 建立DataFrame,並寫出 Scala 語句完成下列操作:
(1)查詢所有資料;
(2)查詢所有資料,並去除重複的資料;
(3)查詢所有資料,列印時去除 id 欄位;
(4)篩選出 age>30的記錄;
(5)將資料按 age 分組;
(6)將資料按 name 升序排列;
(7)取出前 3 行資料;
(8)查詢所有記錄的 name 列,併為其取別名為 username;
(9)查詢年齡 age的平均值;
(10)查詢年齡 age的最小值。
2.程式設計實現將 RDD轉換為 DataFrame
原始檔內容如下(包含id,name,age):
請將資料複製儲存到 Linux 系統中,命名為 employee.txt,實現從 RDD 轉換得到
DataFrame,並按“id:1,name:Ella,age:36”的格式打印出
原始碼:
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val peopleRDD = spark.sparkContext.textFile("file:///opt/software/employee.txt") val schemaString = "id name age" val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim)) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF.createOrReplaceTempView("people") val results = spark.sql("SELECT id,name,age FROM people") results.map(attributes => "id: " + attributes(0)+","+"name:"+attributes(1)+","+"age:"+attributes(2)).show(false) |
3.程式設計實現利用 DataFrame讀寫 MySQL的資料
(1)在 MySQL資料庫中新建資料庫 sparktest,再建立表 employee,包含如表 6-2所示的兩行資料。
表 6-2 employee 表原有資料
id |
name |
gender |
Age |
1 |
Alice |
F |
22 |
2 |
John |
M |
25 |
(2)配置 Spark通過 JDBC連線資料庫MySQL,程式設計實現利用DataFrame插入如表 6-3所示的兩行資料到MySQL中,最後打印出 age的最大值和 age的總和。
表 6-3 employee 表新增資料
id |
name |
gender |
age |
|
3 |
Mary |
F |
26 |
|
4 |
Tom |
M |
23 |
|
package spark.core.exper05 |