SparkSQL的小學習(方便隨時檢視)
阿新 • • 發佈:2018-11-12
1、SQLContextApp
package sparkSQLmook import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /** * spark1.6的SQL使用,本地模式 */ object SQLContextApp { /* def main(args: Array[String]): Unit = { //val path = args(0) val conf = new SparkConf() .setAppName("SQLContextApp") .set("spark.sql.warehouse.dir","file:///") .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("json").load("C://Users//shujuelin//Desktop//spark//people.json") df.show() sc.stop() }*/ //生產模式 def main(args: Array[String]): Unit = { val path = args(0) val conf = new SparkConf() val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("json").load(path) df.show() sc.stop() } }
2、SparkSessionApp
package sparkSQLmook import org.apache.spark.sql import org.apache.spark.sql.SparkSession object SparkSessionApp { def main(args: Array[String]): Unit = { val spark = new SparkSession .Builder() .master("local") .appName("SparkSessionApp") .config("spark.sql.warehouse.dir","file:///") .getOrCreate() val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json") df.show() } }
3、ParquetDemo
package sparkSQLmook import org.apache.spark.sql.SparkSession object ParquetDemo { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("ParquetDemo") .master("local[2]") .config("spark.sql.warehouse.dir","file:///") .getOrCreate() /** * spark.read.format("parquet").load標準寫法 */ //val rddDF = spark.read.format("parquet").load("C://Users//shujuelin//Desktop//spark//users.parquet") //rddDF.show() //sparksql預設的處理format就是parquet // rddDF.select("name","favorite_color").write.format("json").save("C://Users//shujuelin//Desktop//spark//userss.json") /** * 通用型 */ val DF = spark.read.load("C://Users//shujuelin//Desktop//spark//users.parquet") DF.show(false) spark.stop() } }
4、JdbcBeelineSQL
package sparkSQLmook
import java.sql.DriverManager
//通過jdbc方式訪問sparkSQL
object JdbcBeelineSQL {
def main(args: Array[String]) {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://spark1:10000","root","")
val pstmt = conn.prepareStatement("select name,age,score from students")
val rs = pstmt.executeQuery()
while (rs.next()) {
println("name:" + rs.getString("name") +
" , age:" + rs.getInt("age") +
" , score:" + rs.getDouble("score"))
}
rs.close()
pstmt.close()
conn.close()
}
}
5、HiveMySQLApp
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* 使用外部資料來源綜合查詢Hive和MySQL的表資料
* 外部資料來源綜合案例(重要) 把hive裡的表和mysql裡的表結合在一起
create database spark;
use spark;
//建立表
CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;
//插入資料
INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');
*/
object HiveMySQLApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("HiveMySQLApp")
.master("local[2]").getOrCreate()
// 載入Hive表資料
val hiveDF = spark.table("emp")
// 載入MySQL表資料
val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
// JOIN
val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
resultDF.show
resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
mysqlDF.col("deptno"), mysqlDF.col("dname")).show
spark.stop()
}
}
6、HiveAPP
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* sparkSQL操作hive表資料 1.讀:spark.table(tablename)
* 2.寫:df.write.saveAsTable(tablename)
*/
object HiveAPP {
//採用spark.sql方式 --->在spark-shell裡操作
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DataFrameRdd")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
import spark.implicits._
//val HiveDf = spark.table("t_movies")
spark.sql("show databases").show()
}
}
7、DatasetApp
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* Dataset操作 ->讀取scv檔案
*/
object DatasetApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DatasetApp")
.master("local[2]").getOrCreate()
//注意:需要匯入隱式轉換
import spark.implicits._
val path = "file:///f:/text/sales.csv"
//spark如何解析csv檔案? 頭:就是csv中的欄位 inferSchema:自動推斷schema
val df = spark.read.option("header","true").option("inferSchema","true").csv(path)
df.show
val ds = df.as[Sales] //DataFrame轉換為DataSet
//map是迭代,每一行只取出id
ds.map(line => line.itemId).show
spark.sql("seletc name from person").show
//df.seletc("name")
df.select("name")
ds.map(line => line.itemId)
spark.stop()
}
case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}
8、DataFrameRdd
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* 將RDD轉換為DataFrame 第一種方式:採用反射的方式
*/
object DataFrameRdd {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DataFrameRdd")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
//RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")
import spark.implicits._
//把Rdd轉換為DataFrame
val lineDF = rdd.map(_.split(",")).map(line => info(line(0).toInt,line(1),line(2).toInt)).toDF()
/* val lineRDD = rdd.map(line => line.split(","))
val lineDF = lineRDD.map(lines => info(lines(0).toInt,lines(1),lines(2).toInt)).toDF()*/
//lineDF.show()
//1.基於DataFrame的api程式設計
//lineDF.filter($"age">20).show()
//2.基於sql的api
lineDF.createOrReplaceTempView("info")
spark.sql("select name,age from info where age >20").show()
spark.stop()
}
case class info(id : Int, name : String, age : Int)
}
9、DataFrameRdd2
package sparkSQLmook
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* 把RDD轉換為DataFrame的第二種方式:程式設計式Row。當不知道資料的型別時候,採用
*/
object DataFrameRdd2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameRdd2")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")
//1.把rdd轉為row
val rddRow = rdd.map(_.split(",")).map(line => Row(line(0).toInt,line(1),line(2).toInt))
//2.對row建立scheme元資料結構
val structType = StructType(Array(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)))
//把row和scheme繫結
val df = spark.createDataFrame(rddRow,structType)
//df.show()
//sql式程式設計
df.createOrReplaceTempView("info")//建立臨時表
spark.sql("select * from info where age > 20").show()
spark.stop()
}
}
10、DataFrameOperation
package sparkSQLmook
import org.apache.spark.sql
import org.apache.spark.sql.functions._
/**
* DataFrame的API操作
*/
object DataFrameOperation {
def main(args: Array[String]): Unit = {
val spark = new sql.SparkSession
.Builder()
.master("local")
.appName("SparkSessionApp")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
//隱式轉換
import spark.implicits._
//val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json")
val df = spark.read.format("json").load("C:/Users/shujuelin/Desktop/spark/people.json")
//df.show()//show()預設20條
//df.printSchema()
//df.select("name").show()//select操作,典型的弱型別,untyped操作
// df.select($"name", $"age" + 1).show() // 使用表示式,scala的語法,要用$符號作為字首。對年齡加1
//df.filter($"age">19).show()
//另一種寫法
//df.filter(df.col("age")>19).show()
df.select(df.col("name"),(df.col("age")+3).as("age2")).show() //別名
//df.groupBy("age").count().show()//先分組在進行聚合
spark.stop()
}
}
11、DataFrameCase
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* DataFrame的案例(api其他操作)
*/
object DataFrameCase {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameCase")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
val rdd = spark.sparkContext.textFile("C://Users//shujuelin//Desktop//spark//student.data")
import spark.implicits._
// 分割符|必須要加轉義字元\\
//採用反射的方式轉換為dataframe
val infoDF = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()
infoDF.show(false)//show 預設顯示20條 如果超出可以用 infoDF.show(30,false),false代表不擷取
//infoDF.take(10).foreach(println)
//infoDF.first()//拿第一條
//infoDF.head(3) // 拿前三條
//infoDF.select("name","phone").show()
//infoDF.show(20,false)
//過濾名字為空的和NULL的
//infoDF.filter("name = '' or name = 'NULL'").show()
//過濾名字以s開頭的
//infoDF.filter("substr(name,0,1) = 's'").show(20,false)
//排序
//按照名字排序.預設升序
//infoDF.sort($"name".desc).show()//或者 infoDF.sort(infoDF.col("name").desc).show()
//infoDF.sort(infoDF.col("name").asc,infoDF.col("id").desc).show(20,false)
//改欄位名字
//infoDF.select(infoDF.col("name").as("student_info")).show(20,false)
//join操作
/* val infoDF2 = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()
infoDF.join(infoDF2, infoDF.col("id") === infoDF2.col("id")).show(20,false) //預設inner連線
*/
spark.stop()
}
case class info(id : Int, name : String, phone : String, email : String)
}