1. 程式人生 > 其它 >2021-2022年寒假學習進度20

2021-2022年寒假學習進度20

今天完成spark基礎實驗五

一、實驗目的

(1)通過實驗掌握SparkSQL的基本程式設計方法;

(2)熟悉RDD到DataFrame 的轉化方法;

(3)熟悉利用SparkSQL管理來自不同資料來源的資料。

二、實驗平臺

作業系統: Ubuntu16.04 Spark 版本:2.1.0

資料庫:MySQL

三、實驗內容和要求

1.SparkSQL基本操作

將下列JSON 格式資料複製到 Linux 系統中,並儲存命名為 employee.json。

employee.json 建立DataFrame,並寫出 Scala 語句完成下列操作:

(1)查詢所有資料;

(2)查詢所有資料,並去除重複的資料;

(3)查詢所有資料,列印時去除 id 欄位;

(4)篩選出 age>30的記錄;

(5)將資料按 age 分組;

(6)將資料按 name 升序排列;

(7)取出前 3 行資料;

(8)查詢所有記錄的 name 列,併為其取別名為 username;

(9)查詢年齡 age的平均值;

(10)查詢年齡 age的最小值。

2.程式設計實現將 RDD轉換為 DataFrame

原始檔內容如下(包含id,name,age):

將資料複製儲存到 Linux 系統中,命名為 employee.txt,實現從 RDD 轉換得到

DataFrame,並按“id:1,name:Ella,age:36”的格式打印出

DataFrame 的所有資料。請寫出程式程式碼。

原始碼:

import org.apache.spark.sql.types._

import org.apache.spark.sql.Row

val peopleRDD = spark.sparkContext.textFile("file:///opt/software/employee.txt")

val schemaString = "id name age"

val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))

val peopleDF = spark.createDataFrame(rowRDD, schema)

peopleDF.createOrReplaceTempView("people")

val results = spark.sql("SELECT id,name,age FROM people")

results.map(attributes => "id: " + attributes(0)+","+"name:"+attributes(1)+","+"age:"+attributes(2)).show(false)

3.程式設計實現利用 DataFrame讀寫 MySQL的資料

(1) MySQL資料庫中新建資料庫 sparktest,再建立表 employee,包含如表 6-2所示的兩行資料。

6-2 employee 表原有資料

id

name

gender

Age

1

Alice

F

22

2

John

M

25

(2)配置 Spark通過 JDBC連線資料庫MySQL,程式設計實現利用DataFrame插入如表 6-3所示的兩行資料到MySQL中,最後打印出 age的最大值和 age的總和。

6-3 employee 表新增資料

id

name

gender

age

3

Mary

F

26

4

Tom

M

23

package spark.core.exper05

/**
* @ClassName TestMysql.java
* @author 趙浩博
* @version 1.0.0
* @Description TODO
* @createTime 2022年01月19日 14:31:00
*/
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
object TestMysql {
def main(args: Array[String]) {
val spark=SparkSession.builder().getOrCreate()
val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt))
val employeeDF = spark.createDataFrame(rowRDD, schema)
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "1229")
prop.put("driver","com.mysql.cj.jdbc.Driver")
employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", "sparktest.employee", prop)
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "1229").load()
jdbcDF.agg("age" -> "max", "age" -> "sum")
}
}

作者:哦心有 出處:https://www.cnblogs.com/haobox/ 本文版權歸作者和部落格園共有,歡迎轉載,但必須給出原文連結,並保留此段宣告,否則保留追究法律責任的權利。