1. 程式人生 > >第7章 External Data Source

第7章 External Data Source

7-1 -課程目錄

 

 

7-2 -產生背景

使用者:

方便快速從不同的資料來源(json,parquet,rebms),經過混合處理(Json,join,parquet),再將處理結果以特定的格式(json,parquet)再寫回到指定的(HDFS,s3)上去

Spark SQL1.2==>外部資料來源API

7-3 -概述

沒有本次課

7-4 -目標

 

外部資料來源的目的

1)開發人員:是否需要把程式碼合併到spark中???weibo

2)使用者

spark.read.format(format)

format

build-in: json parquet jdbc | csv(2+)

packages:外部的 並不是spark內建

https://spark-packages.org/

寫:people.write.format("parquet").save("path")

 

7-5 -操作Parquet檔案資料

 

import org.apache.spark.sql.SparkSession

/**

* Parquet檔案操作

*/

object ParquetApp {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("SparkSessionApp")

.master("local[2]").getOrCreate()

/**

* spark.read.format("parquet").load 這是標準寫法

*/

val userDF = spark.read.format("parquet").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")

userDF.printSchema()

userDF.show()

userDF.select("name","favorite_color").show

userDF.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/jsonout")

spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").show

//會報錯,因為sparksql預設處理的format就是parquet

spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show

spark.read.format("parquet").option("path","file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").load().show

spark.stop()

}

}

7-6 -操作Hive表資料

 

7-7 -操作MySQL表資料

 

7-8 -Hive和MySQL綜合使用

原始碼地址:

https://gitee.com/sag888/big_data/blob/master/%E4%BB%A5%E6%85%95%E8%AF%BE%E7%BD%91%E6%97%A5%E5%BF%97%E5%88%86%E6%9E%90%E4%B8%BA%E4%BE%8B%20%E8%BF%9B%E5%85%A5%E5%A4%A7%E6%95%B0%E6%8D%AE%20Spark%20SQL%20%E7%9A%84%E4%B8%96%E7%95%8C/project/p1867y/ImoocSparkSQLProject/src/main/scala/com/imooc/spark/HiveMySQLApp.scala

 

原始碼:

package com.imooc.spark

import org.apache.spark.sql.SparkSession

/**

* 使用外部資料來源綜合查詢Hive和MySQL的表資料

*/

object HiveMySQLApp {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("HiveMySQLApp")

.master("local[2]").getOrCreate()

// 載入Hive表資料

val hiveDF = spark.table("emp")

// 載入MySQL表資料

val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()

// JOIN

val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))

resultDF.show

resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),

mysqlDF.col("deptno"), mysqlDF.col("dname")).show

spark.stop()

}

}