-spark基礎操作
dataframe
spark dataframe派生於RDD類,但是提供了非常強大的資料操作功能。主要對類SQL的支援。
DataFrame是一種以RDD為基礎的分散式資料集,類似於傳統資料塊中的表,它與RDD最主要的區別在於:DataFrame有schema元資料,即DataFrame所表示的資料集的每一列都有名稱和資料型別。正是因為有了這些schema元資料,Sparl SQL的查詢優化器就可以進行鍼對性的優化。
spark dataframe 的幾個關鍵點:
-
分散式的資料集
-
類似關係型資料庫中的table,或者 excel 裡的一張 sheet,或者 python/R 裡的 dataframe
-
擁有豐富的操作函式,類似於 rdd 中的運算元
-
一個 dataframe 可以被註冊成一張資料表,然後用 sql 語言在上面操作
-
豐富的建立方式
-
已有的RDD
-
結構化資料檔案
-
JSON資料集
-
Hive表
-
外部資料庫
-
RDD和 DataFrame的比較
# 前者沒有schema資訊;後者有schema資訊
# RDD無法得知所存的資料元素的具體內部結構,Spark Core只能在stage層面進行簡單的優化;後者因為有schema資訊,Sparl SQL的查詢優化器就可以進行鍼對性的優化
# RDD通過函式式呼叫API,雖然簡潔明瞭,但是需要建立新的物件,不容易重用舊的物件,給GC帶來挑戰;DataFrame是儘可能的重用物件
在實際工作中會遇到這樣的情況,主要是會進行兩個資料集的篩選、合併,重新入庫。
首先載入資料集,然後在提取資料集的前幾行過程中,才找到limit的函式。
而合併就用到union函式,重新入庫,就是registerTemple註冊成表,再進行寫入到HIVE中。
不得不讚嘆dataframe的強大。
DataFrame建立方式
跟關係資料庫的表(Table)一樣,DataFrame是Spark中對帶模式(schema)行列資料的抽象。DateFrame廣泛應用於使用SQL處理大資料的各種場景。建立DataFrame有很多種方法,比如從本地List建立、從RDD建立或者從源資料建立,下面簡要介紹建立DataFrame的三種方法。
方法一,Spark中使用toDF
函式建立DataFrame
通過匯入(importing)Spark sql implicits, 就可以將本地序列(seq), 陣列或者RDD轉為DataFrame。只要這些資料的內容能指定資料型別即可。
本地seq + toDF建立DataFrame示例:
import sqlContext.implicits._
val df = Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
sqlContext.implicits._
val df = Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
注意:如果直接用toDF()而不指定列名字,那麼預設列名為"1", "2", ...
通過case class + toDF建立DataFrame的示例
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// 使用 sqlContext 執行 sql 語句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 注:sql()函式的執行結果也是DataFrame,支援各種常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// 使用 sqlContext 執行 sql 語句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 注:sql()函式的執行結果也是DataFrame,支援各種常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
方法二,Spark中使用createDataFrame
函式建立DataFrame
在SqlContext
中使用createDataFrame也可以建立DataFrame。跟toDF
一樣,這裡建立DataFrame的資料形態也可以是本地陣列或者RDD。
通過row+schema建立示例
import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
org.apache.spark.sql.types._
val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
方法三,通過檔案直接建立DataFrame
使用parquet檔案建立
val df = sqlContext.read.parquet("hdfs:/path/to/file")
val df = sqlContext.read.parquet("hdfs:/path/to/file")
使用json檔案建立
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
/**
* DataFrame API基本操作
*/
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameApp")
.master("local[2]")
.getOrCreate()
// 將json檔案載入成一個dataframe
val peopleDF = spark.read
.format("json")
.load("people.json")
// 輸出dataframe對應的schema資訊
peopleDF.printSchema()
// 輸出資料集的前20條記錄
peopleDF.show()
//查詢某列所有的資料: select name from table
peopleDF.select("name").show()
// 查詢某幾列所有的資料,並對列進行計算: select name, age+10 as age2 from table
peopleDF
.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2"))
.show()
//根據某一列的值進行過濾: select * from table where age>19
peopleDF.filter(peopleDF.col("age") > 19).show()
//根據某一列進行分組,然後再進行聚合操作: select age,count(1) from table group by age
peopleDF.groupBy("age").count().show()
spark.stop()
}
}
/**
* RDD to DataFrame以及DataFrame操作
*/
object DataFrameCase {
case class Student(id: Int, name: String, phone: String, email: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameRDDApp")
.master("local[2]")
.getOrCreate()
// RDD ==> DataFrame
val rdd =
spark.sparkContext.textFile("student.data")
//注意:需要匯入隱式轉換
import spark.implicits._
val studentDF = rdd
.map(_.split("""|"""))
.map(line => Student(line(0).toInt, line(1), line(2), line(3)))
.toDF()
//show預設只顯示前20條
studentDF.show
studentDF.show(30)
studentDF.show(30, false)
studentDF.take(10)
studentDF.first()
studentDF.head(3)
studentDF.select("email").show(30, false)
studentDF.filter("name=''").show
studentDF.filter("name='' OR name='NULL'").show
//name以M開頭的人
studentDF.filter("SUBSTR(name,0,1)='M'").show
studentDF.sort(studentDF("name")).show
studentDF.sort(studentDF("name").desc).show
studentDF.sort("name", "id").show
studentDF.sort(studentDF("name").asc, studentDF("id").desc).show
studentDF.select(studentDF("name").as("student_name")).show
val studentDF2 = rdd
.map(_.split("\\|"))
.map(line => Student(line(0).toInt, line(1), line(2), line(3)))
.toDF()
studentDF
.join(studentDF2, studentDF.col("id") === studentDF2.col("id"))
.show
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{
StringType,
IntegerType,
StructField,
StructType
}
/**
* DataFrame和RDD的互操作
*/
object DataFrameRDDApp {
case class Info(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameRDDApp")
.master("local[2]")
.getOrCreate()
inferReflection(spark)
program(spark)
spark.stop()
}
def inferReflection(spark: SparkSession): Unit = {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
//注意:需要匯入隱式轉換
import spark.implicits._
val infoDF = rdd
.map(_.split(","))
.map(line => Info(line(0).toInt, line(1), line(2).toInt))
.toDF()
infoDF.show()
infoDF.filter(infoDF.col("age") > 30).show
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
def program(spark: SparkSession): Unit = {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
val infoRDD = rdd
.map(_.split(","))
.map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(
Array(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
val infoDF = spark.createDataFrame(infoRDD, structType)
infoDF.printSchema()
infoDF.show()
//通過df的api進行操作
infoDF.filter(infoDF.col("age") > 30).show
//通過sql的方式進行操作
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* HiveContext的使用
* 使用時需要通過--jars 把mysql的驅動傳遞到classpathÒ
*/
object HiveContextApp {
def main(args: Array[String]): Unit = {
val warehouseLocation = "spark-warehouse"
val sparkConf =
new SparkConf().set("spark.sql.warehouse.dir", warehouseLocation)
val spark =
SparkSession.builder.enableHiveSupport.config(sparkConf).getOrCreate()
//2)相關的處理:
spark.table("emp").show
//3)關閉資源
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* 使用外部資料來源綜合查詢Hive和MySQL的表資料
*/
object HiveMySQLApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("HiveMySQLApp")
.master("local[2]")
.getOrCreate()
// 載入Hive表資料
val hiveDF = spark.table("emp")
// 載入MySQL表資料
val mysqlDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306")
.option("dbtable", "spark.DEPT")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.jdbc.Driver")
.load()
// JOIN
val resultDF =
hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
resultDF.show
resultDF
.select(hiveDF.col("empno"),
hiveDF.col("ename"),
mysqlDF.col("deptno"),
mysqlDF.col("dname"))
.show
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* Parquet檔案操作
*/
object ParquetApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("SparkSessionApp")
.master("local[2]")
.getOrCreate()
/**
* spark.read.format("parquet").load 這是標準寫法
*/
val userDF = spark.read
.format("parquet")
.load("users.parquet")
userDF.printSchema()
userDF.show()
userDF.select("name", "favorite_color").show
userDF
.select("name", "favorite_color")
.write
.format("json")
.save("file:///home/hadoop/tmp/jsonout")
spark.read
.load("users.parquet")
.show
//會報錯,因為sparksql預設處理的format就是parquet
spark.read
.load("people.json")
.show
spark.read
.format("parquet")
.option("path", "users.parquet")
.load()
.show
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* Schema Infer
*
* "spark.sql.sources.partitionColumnTypeInference.enabled" 預設是 true
*/
object SchemaInferApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("SchemaInferApp")
.master("local[2]")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
.getOrCreate()
val df = spark.read
.format("json")
.load("file:///Users/rocky/data/json_schema_infer.json")
df.printSchema()
df.show()
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* SparkSession的使用
*
* SparkSession是spark2.0以後預設的的統一客戶端程式入口。
*
* sparkSession是HiveContext和sqlContext的統一入口
* sparkContext可以通過spark.sparkContext獲得
*/
object SparkSessionApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.enableHiveSupport()
.appName("SparkSessionApp")
.master("local[2]")
.getOrCreate()
val sc = spark.sparkContext
val people = spark.read.json("file:///Users/rocky/data/people.json")
people.show()
spark.stop()
}
}
package spark_basic
import java.sql.DriverManager
/**
* 通過JDBC的方式訪問
*/
object SparkSQLThriftServerApp {
def main(args: Array[String]) {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://hadoop001:14000","hadoop","")
val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
val rs = pstmt.executeQuery()
while (rs.next()) {
println("empno:" + rs.getInt("empno") +
" , ename:" + rs.getString("ename") +
" , sal:" + rs.getDouble("sal"))
}
rs.close()
pstmt.close()
conn.close()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
/**
* SQLContext的使用:
* 注意:IDEA是在本地,而測試資料是在伺服器上 ,能不能在本地進行開發測試的?
*/
object SQLContextApp {
def main(args: Array[String]): Unit = {
val path = args(0)
//1)建立相應的Context
val sparkConf = new SparkConf()
//在測試或者生產中,AppName和Master我們是通過指令碼進行指定
//sparkConf.setAppName("SQLContextApp").setMaster("local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
//2)相關的處理: json
val people = spark.sqlContext.read.format("json").load(path)
people.printSchema()
people.show()
//3)關閉資源
sc.stop()
}
}