第7章 External Data Source
7-1 -課程目錄
7-2 -產生背景
使用者:
方便快速從不同的資料來源(json,parquet,rebms),經過混合處理(Json,join,parquet),再將處理結果以特定的格式(json,parquet)再寫回到指定的(HDFS,s3)上去
Spark SQL1.2==>外部資料來源API
7-3 -概述
沒有本次課
7-4 -目標
外部資料來源的目的
1)開發人員:是否需要把程式碼合併到spark中???weibo
2)使用者
spark.read.format(format)
format
build-in: json parquet jdbc | csv(2+)
packages:外部的 並不是spark內建
寫:people.write.format("parquet").save("path")
7-5 -操作Parquet檔案資料
import org.apache.spark.sql.SparkSession
/**
* Parquet檔案操作
*/
object ParquetApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("SparkSessionApp")
.master("local[2]").getOrCreate()
/**
* spark.read.format("parquet").load 這是標準寫法
*/
val userDF = spark.read.format("parquet").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
userDF.printSchema()
userDF.show()
userDF.select("name","favorite_color").show
userDF.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/jsonout")
spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").show
//會報錯,因為sparksql預設處理的format就是parquet
spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show
spark.read.format("parquet").option("path","file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").load().show
spark.stop()
}
}
7-6 -操作Hive表資料
7-7 -操作MySQL表資料
7-8 -Hive和MySQL綜合使用
原始碼地址:
原始碼:
package com.imooc.spark
import org.apache.spark.sql.SparkSession
/**
* 使用外部資料來源綜合查詢Hive和MySQL的表資料
*/
object HiveMySQLApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("HiveMySQLApp")
.master("local[2]").getOrCreate()
// 載入Hive表資料
val hiveDF = spark.table("emp")
// 載入MySQL表資料
val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
// JOIN
val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
resultDF.show
resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
mysqlDF.col("deptno"), mysqlDF.col("dname")).show
spark.stop()
}
}