1. 程式人生 > >SparkSQL基本用法一

SparkSQL基本用法一

get() nal glob implicit pack news ger set employee

參考:http://spark.apache.org/docs/latest/sql-programming-guide.html

1)使用maven構建Scala工程。

1.1)新增pom依賴包文件如下:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>2.2.2</version>
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.2.2</version>
</dependency>

1.2)新建Scala類,代碼及功能描述如下:

package com.fishjar.sparksql

import org.apache.spark.sql._
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._


/**
* Created by fishjar on 2018/8/22.
*/
object SparkSqlDemo {

//case class的定義要在引用case class函數的外面。因為我只有一個main函數,所以把case class挪到了外面
case class Person(name: String, age: Long)

def main(args: Array[String]): Unit = {

val path1="D:\\test\\people.json"
val path2="D:\\test\\people.txt"

//1)創建SparkSession
System.setProperty("hadoop.home.dir","F:\\hadp-dev\\hadoop-2.7.1");
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.master("local[2]")
.config("spark.some.config.option", "some-value")
.getOrCreate()


// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

//2)創建DataFrame
val df = spark.read.json(path1)
//2.1)顯示DataFrame的內容
df.show()
//2.2)打印樹格式中的schema
df.printSchema()
//2.3)選擇name列
df.select("name").show()
//2.4)選擇所有數據,並且將age列加1
df.select($"name", $"age" + 1).show()
//2.5)過濾年齡大於21的數據
df.filter($"age" > 21).show()
//2.6)根據年齡來分類統計
df.groupBy("age").count().show()

//3)將DataFrame以試圖的方式創建,並返回結果集

//3.1)將DataFrame註冊為一個視圖
df.createOrReplaceTempView("people")
//3.2)查詢視圖
val sqlDF = spark.sql("SELECT * FROM people")
println("*******createOrReplaceTempView*******")
sqlDF.show()

//3.3)將DataFrame註冊為一個全局的視圖
df.createGlobalTempView("people")
println("*******createGlobalTempView*****")
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()


//4創建數據集
println("*******4創建數據集*****")

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
for(arg <- primitiveDS ) println(arg)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val peopleDS = spark.read.json(path1).as[Person]
println("********peopleDS*******")
peopleDS.show()


//5用反射來間接的使用schema
//Inferring the Schema Using Reflection

println("5用反射來間接的使用schema")
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("D:\\test\\aaa.txt")
.map(_.split(","))
.map(attributes => Person(attributes(1), attributes(0).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
teenagersDF.show(1)

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
println("encoders")
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

//6 Programmatically Specifying the Schema

println("**********Programmatically Specifying the Schema")

import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("d:\\test\\people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF2 = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF2.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
results.map(attributes => "Name: " + attributes(0)).show()
results.show()
peopleDF2.show()


//7 用戶自定義函數
println("*********7用戶自定義函數************")
//Untyped User-Defined Aggregate Functions
// Register the function to access it
spark.udf.register("myAverage", MyAverage)
val df2 = spark.read.json("D:\\test\\employees.json")
df2.createOrReplaceTempView("employees")
df2.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

//8 用戶自定義的類型安全的函數
//Type-Safe User-Defined Aggregate Functions
println("**********用戶自定義的類型安全的函數**********")
val ds = spark.read.json("D:\\test\\employees.json").as[Employee]
ds.show()
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage2.toColumn.name("average_salary")
val result2 = ds.select(averageSalary)
result2.show()


}

object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}



case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage2 extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
}

SparkSQL基本用法一