1. 程式人生 > >Spark SQL原始碼函式解讀及UDF/UDAF例子 spark研習第六集

Spark SQL原始碼函式解讀及UDF/UDAF例子 spark研習第六集

四、 Spark SQL原始碼函式解讀

1. Spark SQL內建函式解密與實戰

SparkSQL的DataFrame引入了大量的內建函式,這些內建函式一般都有CG(CodeGeneration)功能,這樣的函式在編譯和執行時都會經過高度優化。

問題:SparkSQL操作Hive和Hive on Spark一樣嗎?

=> 不一樣。SparkSQL操作Hive只是把Hive當作資料倉庫的來源,而計算引擎就是SparkSQL本身。Hive on spark是Hive的子專案,Hive on Spark的核心是把Hive的執行引擎換成Spark。眾所周知,目前Hive的計算引擎是Mapreduce,因為效能低下等問題,所以Hive的官方就想替換這個引擎。

SparkSQL操作Hive上的資料叫Spark on Hive,而Hive on Spark依舊是以Hive為核心,只是把計算引擎由MapReduce替換為Spark。

Experimental 
A distributed collection of data organized into named columns. 
A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set.

  1. val people = sqlContext.read.parquet("...")// in Scala
  2. DataFrame people = sqlContext.read().parquet("...")// in Java
  3. Once created, it can be manipulated using the various domain-specific-language (DSL) functions definedin:DataFrame(thisclass),Column,and functions.
  4. Toselect a column from the data frame,
    use apply method inScalaand col inJava.
  5. val ageCol = people("age")// in Scala
  6. Column ageCol = people.col("age")// in Java
  7. Note that the Column type can also be manipulated through its various functions.
  8. // The following creates a new column that increases everybody's age by 10.
  9. people("age")+10// in Scala
  10. people.col("age").plus(10);// in Java
  11. A more concrete example inScala:
  12. // To create DataFrame using SQLContextval people = sqlContext.read.parquet("...")val department = sqlContext.read.parquet("...")
  13. people.filter("age > 30")
  14. .join(department, people("deptId")=== department("id"))
  15. .groupBy(department("name"),"gender")
  16. .agg(avg(people("salary")), max(people("age")))
  17. andinJava:
  18. // To create DataFrame using SQLContext
  19. DataFrame people = sqlContext.read().parquet("...");
  20. DataFrame department = sqlContext.read().parquet("...");
  21. people.filter("age".gt(30))
  22. .join(department, people.col("deptId").equalTo(department("id")))
  23. .groupBy(department.col("name"),"gender")
  24. .agg(avg(people.col("salary")), max(people.col("age")));

以上內容中的join,groupBy,agg都是SparkSQL的內建函式。 
SParkl1.5.x以後推出了很多內建函式,據不完全統計,有一百多個內建函式。 
下面實戰開發一個聚合操作的例子:

  1. package com.dt.spark
  2. import org.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}
  3. import org.apache.spark.{SparkConf,SparkContext}
  4. import org.apache.spark.sql.{Row,SQLContext}
  5. import org.apache.spark.sql.functions._
  6. /**
  7. * 使用Spark SQL中的內建函式對資料進行分析,Spark SQL API不同的是,DataFrame中的內建函式操作的結果是返回一個Column物件,而
  8. * DataFrame天生就是"A distributed collection of data organized into named columns.",這就為資料的複雜分析建立了堅實的基礎
  9. * 並提供了極大的方便性,例如說,我們在操作DataFrame的方法中可以隨時呼叫內建函式進行業務需要的處理,這之於我們構建附件的業務邏輯而言是可以
  10. * 極大的減少不必須的時間消耗(基於上就是實際模型的對映),讓我們聚焦在資料分析上,這對於提高工程師的生產力而言是非常有價值的
  11. * Spark 1.5.x開始提供了大量的內建函式,例如agg:
  12. * def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
  13. * groupBy().agg(aggExpr, aggExprs : _*)
  14. *}
  15. * 還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan
  16. * 總體上而言內建函式包含了五大基本型別:
  17. * 1,聚合函式,例如countDistinct、sumDistinct等;
  18. * 2,集合函式,例如sort_array、explode等
  19. * 3,日期、時間函式,例如hour、quarter、next_day
  20. * 4, 數學函式,例如asin、atan、sqrt、tan、round等;
  21. * 5,開窗函式,例如rowNumber等
  22. * 6,字串函式,concat、format_number、rexexp_extract
  23. * 7, 其它函式,isNaN、sha、randn、callUDF
  24. */
  25. objectSparkSQLAgg{
  26. def main(args:Array[String]){
  27. System.setProperty("hadoop.home.dir","G:/datarguru spark/tool/hadoop-2.6.0")
  28. val conf =newSparkConf()
  29. conf.setAppName("SparkSQLlinnerFunctions")
  30. //conf.setMaster("spark://master:7077")
  31. conf.setMaster("local")
  32. val sc =newSparkContext(conf)
  33. val sqlContext =newSQLContext(sc)//構建SQL上下文
  34. //要使用Spark SQL的內建函式,就一定要匯入SQLContext下的隱式轉換
  35. import sqlContext.implicits._
  36. //模擬電商訪問的資料,實際情況會比模擬資料複雜很多,最後生成RDD
  37. val userData =Array(
  38. "2016-3-27,001,http://spark.apache.org/,1000",
  39. "2016-3-27,001,http://Hadoop.apache.org/,1001",
  40. "2016-3-27,002,http://fink.apache.org/,1002",
  41. "2016-3-28,003,http://kafka.apache.org/,1020",
  42. "2016-3-28,004,http://spark.apache.org/,1010",
  43. "2016-3-28,002,http://hive.apache.org/,1200",
  44. "2016-3-28,001,http://parquet.apache.org/,1500",
  45. "2016-3-28,001,http://spark.apache.org/,1800"
  46. )
  47. val userDataRDD = sc.parallelize(userData)//生成分散式叢集物件
  48. //根據業務需要對資料進行預處理生成DataFrame,要想把RDD轉換成DataFrame,需要先把RDD中的元素型別變成Row型別
  49. //於此同時要提供DataFrame中的Columns的元資料資訊描述
  50. val userDataRDDRow = userDataRDD.map(row =>{val splited = row.split(",");Row(splited(0),splited(1).toInt,splited(2), splited(3).toInt)})
  51. val structType =StructType(Array(
  52. StructField("time",StringType,true),
  53. StructField("id",IntegerType,true),
  54. StructField("url",StringType,true),
  55. StructField("amount",IntegerType,true)
  56. ))
  57. val userDataDF = sqlContext.createDataFrame(userDataRDDRow, structType)
  58. //第五步:使用Spark SQL提供的內建函式對DataFrame進行操作,特別注意:內建函式生成的Column物件且自定進行CG;
  59. userDataDF.groupBy("time").agg('time, countDistinct('id))
  60. .map(row =>Row(row(1),row(2))).collect().foreach(println)
  61. userDataDF.groupBy("time").agg('time, sum('amount))
  62. .map(row =>Row(row(1),row(2))).collect().foreach(println)
  63. }
  64. }

2. Spark SQL視窗函式解密與實戰

視窗函式中最重要的是row_number。row_bumber是對分組進行排序,所謂分組排序就是說在分組的基礎上再進行排序。 
下面使用SparkSQL的方式重新編寫TopNGroup.scala程式並執行:

  1. package com.dt.spark
  2. import org.apache.spark.sql.hive.HiveContext
  3. import org.apache.spark.{SparkConf,SparkContext}
  4. objectSparkSQLWindowFunctionOps{
  5. def main(args:Array[String]){
  6. val conf =newSparkConf()
  7. conf.setMaster("spark://master:7077")
  8. conf.setAppName("SparkSQLWindowFunctionOps")
  9. val sc =newSparkContext(conf)
  10. val hiveContext =newHiveContext(sc)
  11. hiveContext.sql("DROP TABLE IF EXISTS scores")
  12. hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INT)"
  13. +"ROW FORMAT DELIMITED FIELDS TERMINATED ' ' LINES TERMINATED BY '\\n'")
  14. //將要處理的資料匯入到Hive表中
  15. 相關推薦

    Spark SQL原始碼函式解讀UDF/UDAF例子 spark研習

    四、 Spark SQL原始碼函式解讀 1. Spark SQL內建函式解密與實戰 SparkSQL的DataFrame引入了大量的內建函式,這些內建函式一般都有CG(CodeGeneration)功能,這樣的函式在編譯和執行時都會經過高度優化。

    71課:Spark SQL視窗函式解密與實戰

    內容:     1.SparkSQL視窗函式解析     2.SparkSQL視窗函式實戰 一、SparkSQL視窗函式解析     1.spark支援兩種方式使用視窗函式:  &nb

    Spark SQL 原始碼分析之Physical Plan 到 RDD的具體實現

      我們都知道一段sql,真正的執行是當你呼叫它的collect()方法才會執行Spark Job,最後計算得到RDD。 lazy val toRdd: RDD[Row] = executedPlan.execute()  Spark Plan基本包含4種操作型別,即Bas

    spark sql視窗函式

    視窗函式是spark sql模組從1.4之後開始支援的,主要用於解決對一組資料進行操作,同時為每條資料返回單個結果,比如計算指定訪問資料的均值、計算累進和或訪問當前行之前行資料等,這些場景使用普通函式實現是比較困難的。 視窗函式計算的一組行,被稱為Frame。每

    比較全的Spark中的函式使用程式設計模型

    1. Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基於Spark的使用者程式,包含了一個driver program和叢集中多個executor Driver Program:執行Application的main()函式並建立Spar

    [Spark SQL] 原始碼解析之Parser

    前言 由上篇部落格我們知道了SparkSql整個解析流程如下: sqlText 經過 SqlParser 解析成 Unresolved LogicalPlan; analyzer 模組結合catalog進行繫結,生成 resolved LogicalPla

    Spark 系列(十一)—— Spark SQL 聚合函式 Aggregations

    一、簡單聚合 1.1 資料準備 // 需要匯入 spark sql 內建的函式包 import org.apache.spark.sql.functions._ val spark = SparkSession.builder().appName("aggregations").mast

    Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述

    Spark SQL模組,主要就是處理跟SQL解析相關的一些內容,說得更通俗點就是怎麼把一個SQL語句解析成Dataframe或者說RDD的任務。以Spark 2.4.3為例,Spark SQL這個大模組分為三個子模組,如下圖所示 其中Catalyst可以說是Spark內部專門用來解析SQL的一個框架,在H

    Spark SQL原始碼解析(二)Antlr4解析Sql並生成樹

    Spark SQL原理解析前言: Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述 這一次要開始真正介紹Spark解析SQL的流程,首先是從Sql Parse階段開始,簡單點說,這個階段就是使用Antlr4,將一條Sql語句解析成語法樹。 可能有童鞋沒接觸過antlr4這個內容,推薦看

    Spark SQL原始碼解析(三)Analysis階段分析

    Spark SQL原理解析前言: [Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述](https://www.cnblogs.com/listenfwind/p/12724381.html) [Spark SQL原始碼解析(二)Antlr4解析Sql並生成樹](https://ww

    Spark SQL原始碼解析(四)Optimization和Physical Planning階段解析

    Spark SQL原理解析前言: [Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述](https://www.cnblogs.com/listenfwind/p/12724381.html) [Spark SQL原始碼解析(二)Antlr4解析Sql並生成樹](https://

    Spark SQL原始碼解析(五)SparkPlan準備和執行階段

    Spark SQL原理解析前言: [Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述](https://www.cnblogs.com/listenfwind/p/12724381.html) [Spark SQL原始碼解析(二)Antlr4解析Sql並生成樹](https://w

    C51(AT89C52)同濟大學出版社《微控制器原理應用》(魏鴻磊)12題答案

    原題題目: 12.設系統時鐘頻率為12MHz,利用定時器T0中斷,實現從P2.1輸出高電平寬度為10ms,低電平為20ms的矩形波。 解提思路: 1.系統時鐘頻率為12MHz,那麼一個機器週期為1us,因而在計算TH0和TL0的初值的時候要定時多少微秒直接用定時器最大值減去

    spark三種清理資料的方式:UDF,自定義函式spark.sql;Python中的zip()與*zip()函式詳解//python中的*args和**kwargs

    (1)UDF的方式清理資料 import sys reload(sys) sys.setdefaultencoding('utf8') import re import json from pyspark.sql import SparkSession

    詳解spark sql使用者自定義函式:UDFUDAF

    場景 UDAF = USER DEFINED AGGREGATION FUNCTION11 上一篇文章已經介紹了spark sql的視窗函式,並知道Spark sql提供了豐富的內建函式供猿友們使用,辣為何還要使用者自定義函式呢?實際的業務場景可能很複雜,內建函式hold

    72課:Spark SQL UDFUDAF解密與實戰

    內容:     1.SparkSQL UDF     2.SparkSQL UDAF 一、SparkSQL UDF和SparkSQL UDAF     1.解決SparkSQL內建函式不足問題,自定義內建函式,     2.UDF:User Define Functio

    Spark 2.4.0程式設計指南--Spark SQL UDFUDAF

    Spark 2.4.0程式設計指南–Spark SQL UDF和UDAF 更多資源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 視訊 Spark 2.4.0程

    Spark SQL--UDAF函式

    需求:需要通過繼承 UserDefinedAggregateFunction 來實現自定義聚合函式。案例:計算一下員工的平均工資 弱型別聚合函式: package com.jiangnan.spark import org.apache.spark.SparkConf import or

    14.Spark SQLUDAF自定義聚合函式實戰

    UDAF自定義函式實戰 UDAF:User Defined Aggregate Function。使用者自定義聚合函式。是Spark 1.5.x引入的最新特性。 UDF,其實更多的是針對單行輸入,返

    spark-sql使用UDF函式實現ip對映省份,資料寫出到mysql引數設定。

    spark-SQL使用廣播變數以及應用資料庫的UDF自定義函式的查詢會比兩張表的連線更加的優化的程式的執行。 兩表連線是比較費效率的。 spar-sql 2.x的資料讀取,處理,新增schema資訊,常見表,SQL查詢。 將sql結果輸出到mysql的api引數設定。  還