1. 程式人生 > >Spark:The Definitive Book第六章筆記

Spark:The Definitive Book第六章筆記

after 結構 schema options finally serialize supported ant imp

Where to Look for APIs

DataFrame本質上是類型為Row的DataSet,需要多看https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset來發現API的更新。
DataFrameStatFunctions與DataFrameNaFunctions在解決特定問題上有更多的方法。

  • DataFrameStatFunctions 與統計相關的函數
    https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions
  • DataFrameNaFunctions 與null值相關的函數
    https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions
  • 列相關的函數
    第5章有介紹
    https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column
  • org.apache.spark.sql.functions包含了許多處理各種數據類型的函數,常常整個包導入
    https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
    - - -

    Converting to Spark Types

    用lit函數將其他語言中的值轉換為Spark類型
    SQL中不需要轉換。
import org.apache.spark.sql.functions.lit
df.select(lit(5), lit("five"), lit(5.0))

Working with Booleans

Scala中==和===意義相同。在Spark中,如果你想提供相等性來過濾應該用===或=!=,也可以用not函數和equalTo函數,不能用==與!=
在Spark中,你應該把and過濾器串聯起來。因為如果布爾語句一個接一個,Spark會拉平這些過濾器並同時過濾,這樣更易讀,or過濾器則需要放在同一個表達式中。

val priceFilter = col("UnitPrice") > 600
val descFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descFilter)).show()

過濾DataFrame,也可以指定布爾列

val DOTFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive", DOTFilter.and(priceFilter.or(descFilter)))
    .where("isExpensive")
    .select("UnitPrice", "isExpensive")
    .show()

實際上,用SQL語句比用DataFrame方法更容易,也沒有性能上的損失。

import org.apache.spark.sql.functions.{not, expr}
df.withColumn("isExpensive", not(col("UnitPrice").leq(250))).where("isExpensive").show(5)
df.withColumn("isExpensive", expr("NOT UnitPrice < 250")).where("isExpensive").show(5)

警告:null在相等性過濾的處理 ?:eqNullSafe("hello")

val nulltest = Seq((1,2, null),(1,2,"t"), (1,2,"test")).toDF("col1", "col2", "col3")
nulltest.where(col("col3") =!= null).show()
nulltest.where(col("col3") === null).show()
nulltest.where(col("col3").eqNullSafe("hello") =!= "t").show()
nulltest.where(col("col3") =!= "t").show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|null|
|   1|   2|   t|
|   1|   2|test|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|test|
+----+----+----+

IS [NOT] DISTINCT FROM
- - -

Working with Numbers

import org.apache.spark.sql.functions.{expr, pow}
val f = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), f.alias("real")).show(4)

用SQL

df.selectExpr("CustomerId", "(POW(Quantity * UnitPrice, 2) + 5) AS real").show(4)

取整及設置取整精度

import org.apache.spark.sql.functions.{round,bround}
df.select(col("UnitPrice"),round(col("UnitPrice")), round(col("UnitPrice"), 1)).show(5)

向下取整
df.select(round(lit(2.5)), bround(lit(2.5))).show(2)
計算兩列的相關系數

import org.apache.spark.sql.functions.corr
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()

計算概要統計指標count, mean, standard deviation, min, and max
df.describe().show()
其他統計指標通過DataFrame.stat計算如approxQuantile計算大致分位數

df.stat.crosstab(""StockCode", "Quantity"").
df.stat.freqItems(Seq("Quantity", "UnitPrice")).show()

添加自增的ID到Row中 monotonically_increasing_id()

import org.apache.spark.sql.functions.monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)

Working with Strings

initcap() capitalize every word in a given string when that word is separated from another by a space.

import org.apache.spark.sql.functions.initcap
df.select(initcap(col("Description"))).show(2)

lower() 小寫化
upper() 大寫化

import org.apache.spark.sql.functions.{lower, upper}
df.select(lower(col("Description")), upper(col("Description"))).show(2)

lpad()
ltrim 移除左邊空格
rpad()
rtrim 移除右邊空格
trim 移除空格

import org.apache.spark.sql.functions.{ltrim, lpad, rtrim, rpad, trim}
df.select(lpad(lit("hello"), 1, " ")).show(2)
df.select(rpad(lit("hello"), 1, " ")).show(2)

Note that if lpad or rpad takes a number less than the length of the string, it will always remove values from the right side of the string.
正則表達式
regexp_extract() 提取

regexp_replace() 替換

iimport org.apache.spark.sql.functions.regexp_replace
import org.apache.spark.sql.functions.regexp_extract
val regexSet = Seq("black", "yellow", "white", "red", "green", "blue")
val regexStr = regexSet.map(_.toUpperCase).mkString("|")
df.select(regexp_replace(col("Description"), regexStr, "COLOR").alias("color_clean")).show(4)
df.select(regexp_extract(col("Description"), regexStr, 0).alias("color")).show(4)

替換
translate() This is done at the character level and will replace all instances of a character with the indexed character in the replacement string
替換單個字符為對應的字符,不是整個替換。

import org.apache.spark.sql.functions.translate
df.select(translate(col("Description"), "LEET", "1337"), col("Description"))
  .show(2)

檢查是否存在
spakr中contains()
Python和SQL中,用instr()

from pyspark.sql.functions import instr
containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)  .where("hasSimpleColor")  .select("Description").show(3, False)
-- in SQL
SELECT Description FROM dfTable
WHERE instr(Description, ‘BLACK‘) >= 1 OR instr(Description, ‘WHITE‘) >= 1

varargs特性:把值列表轉換為多個值作為函數參數。:_*

al simpleColors = Seq("black", "white", "red", "green", "blue")
val selectedColumns = simpleColors.map(color => {col("Description").contains(color.toUpperCase).alias(s"is_$color")}) :+ expr("*")
df.select(selectedColumns: _*).where(col("is_white").or(col("is_red"))).select("Description").show(3, false)

python中使用locate()

from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
  return locate(color_string.upper(), column)          .cast("boolean")          .alias("is_" + c)
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # has to a be Column type

df.select(*selectedColumns).where(expr("is_white OR is_red"))  .select("Description").show(3, False)

這種方法可以用來生成列或布爾過濾器。
- - -

Working with Dates and Timestamps

Spark中Dates專門負責日歷時期,timestamps包括了日期和時間信息。
當我們啟用inferSchame,Spark會盡可能正確識別列的類型,包括Dates和timestamps。
處理Dates和Timestamps與處理字符串相關,因為我們常把日期和時間戳保持為字符串,然後轉換為日期和時間戳類型。
在Spark2.1和更早的版本中,在沒有指定時區時,Spark根據機器的時區來解析時間。可以通過設置spark.conf.sessionLocalTimeZone來設置會話的時區。
The key to understanding the transformations that you are going to need to apply is to ensure that you know exactly what type and format you have at each given step of the way.
Spark的Timestamps只支持秒級別的精度,這意味者如果你打算處理毫秒微秒,你需要把它們當做long類型對象處理。
相關的函數:date_add、date_sub、datediff、months_between、to_date、current_date、current_timestamp

import org.apache.spark.sql.functions.{current_date, current_timestamp}
val dataDF = spark.range(10).withColumn("today", current_date())
    .withColumn("now", current_timestamp())
dataDF.createOrReplaceTempView("dataTable")
import org.apache.spark.sql.functions.{date_add, date_sub}
dataDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)
import org.apache.spark.sql.functions.{datediff, months_between, to_date}
dataDF.withColumn("week_ago", date_sub(col("today"), 7))
    .select(datediff(col("week_ago"), col("today"))).show(1)
dataDF.select(
    to_date(lit("2016-01-01")).alias("start"),
    to_date(lit("2017-05-22")).alias("end")
).select(months_between(col("start"), col("end"))).show(1)
spark.range(5).withColumn("date", lit("2017-01-01")).select(to_date(col("date"))).show(1)

Spark不能解析日期時,不會報錯,而返回null。在部分數據符合正確的日期格式,部分不符合正確的日期格式應當註意。

dataDF.select(to_date(lit("2016-20-12")), to_date(lit("2017-12-11"))).show(1)
+---------------------+---------------------+
|to_date(‘2016-20-12‘)|to_date(‘2017-12-11‘)|
+---------------------+---------------------+
|                 null|           2017-12-11|
+---------------------+---------------------+
only showing top 1 row

to_date()、to_timestamp()函數

import org.apache.spark.sql.functions.{to_date, to_timestamp, lit}
val dateFormat = "yyyy-dd-MM"
val cleanDF = spark.range(1).select(
    to_date(lit("2017-12-11"), dateFormat).alias("date"),
    to_date(lit("2017-20-11"), dateFormat).alias("date2")
)
cleanDF.createOrReplaceTempView("dateTable")
cleanDF.show()
cleanDF.select(to_timestamp(col("date"))).show()

在日期的比較中,可以用字符串直接與列比較,不需要lit函數。 隱式類型轉換可能會搬起石頭砸自己的腳,特別是處理null值時。推薦直接指明類型。

cleanDF.filter(col("date2") > lit("2017-12-12")).show()
cleanDF.filter(col("date2") > "2017-12-12").show()

Working with Nulls in Data

最好一直用null代表缺失值與空值。與空值和其他值相比,Spark能夠優化null的處理。
處理null值相關的函數主要在DataFrame.na這個子包中。
第五章的排序部分有關於null的排序以及第六章Working with Booleans講了布爾與null的問題。
警告:DataFrame的Schema聲明列不為null,Spark不會完全遵守,可以有少數null。null標誌是為了讓Spark來優化列的處理。而這可以會導致不正確的結果或異常。
原則:明確聲明比沒有明確聲明好。
coalesce
coalesce(expr1, expr2, ...) - Returns the first non-null argument if exists. Otherwise, null.

Examples:

SELECT coalesce(NULL, 1, NULL);
1
ifnull
ifnull(expr1, expr2) - Returns expr2 if expr1 is null, or expr1 otherwise.

Examples:

SELECT ifnull(NULL, array(‘2‘));
["2"]
nullif
nullif(expr1, expr2) - Returns null if expr1 equals to expr2, or expr1 otherwise.

Examples:

SELECT nullif(2, 2);
NULL
nvl
nvl(expr1, expr2) - Returns expr2 if expr1 is null, or expr1 otherwise.

Examples:

SELECT nvl(NULL, array(‘2‘));
["2"]
nvl2
nvl2(expr1, expr2, expr3) - Returns expr2 if expr1 is not null, or expr3 otherwise.

Examples:

SELECT nvl2(NULL, 2, 1);
1
https://spark.apache.org/docs/2.3.1/api/sql/

移除null值
DataFrame.na.drop() 移除包含null值的行。第一個可選參數有 “any”"all",兩個值,any任何列中包含null值的行都被移除,all所有列都是null的行被移除。第二個可選參數是要選擇的列數組
df.na.drop("any", Seq("StockCode", "InvoiceNo")).count()
填充
fill()
We could do the same for columns of type Integer by using df.na.fill(5:Integer), or for Doubles df.na.fill(5:Double). ?:運行報錯。

df.na.fill("null")
df.na.fill(5, Seq("StockCode", "InvoiceNo"))
val fillMap = Map("StockCode" -> 5, "Description" -> "No Description")
df.na.fill(fillMap)

替換
df.na.replace("Description", Map("" -> "UNKNOWN"))
- - -

Working with Complex Types

Structs

通過在查詢中將多列用圓括號包起來創建結構體。

df.selectExpr("(InvoiceNo, Description) as complex").show(2)
df.selectExpr("struct(InvoiceNo, Description) as complex").show(2)

import org.apache.spark.sql.functions.struct
val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexTable")

訪問結構體

complexDF.select("complex.Description").show(2)
complexDF.select(col("complex").getField("Description")).show(2)
complexDF.select("complex.*").show(2)

Arrays

分割

分割字符串用split()。

import org.apache.spark.sql.functions.split
df.select(split(col("Description"), " ").alias("array_col")).selectExpr("array_col[0]").show(2)

Array Length

import org.apache.spark.sql.functions.{split, size}
df.select(size(split(col("Description"), " ")).alias("array_size")).show(2)

array_contains

import org.apache.spark.sql.functions.array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)

explode 函數 可以應用在數組和映射上,不能用於結構體。
The explode function takes a column that consists of arrays and creates one row (with the rest of the values duplicated) per value in the array
explode用於數組上產生多行數據。

import org.apache.spark.sql.functions.{split, explode}
df.withColumn("splitted", split(col("Description"), " "))
  .withColumn("exploded", explode(col("splitted")))
  .select("Description", "InvoiceNo", "exploded").show(2)
+--------------------+---------+--------+
|         Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE HANGING HEA...|   536365|   WHITE|
|WHITE HANGING HEA...|   536365| HANGING|
+--------------------+---------+--------+
only showing top 2 rows

Map

通過map函數創建,通過鍵訪問,缺失的鍵返回null

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
  .selectExpr("complex_map[‘WHITE METAL LANTERN‘]").show(2)

explode用於映射上產生多列數據。

df.select(explode(map(col("Description"), col("InvoiceNo")).alias("complex_map"))).show(10)
+--------------------+------+
|                 key| value|
+--------------------+------+
|WHITE HANGING HEA...|536365|
| WHITE METAL LANTERN|536365|
|CREAM CUPID HEART...|536365|
|KNITTED UNION FLA...|536365|
|RED WOOLLY HOTTIE...|536365|
|SET 7 BABUSHKA NE...|536365|
|GLASS STAR FROSTE...|536365|
|HAND WARMER UNION...|536366|
|HAND WARMER RED P...|536366|
|ASSORTED COLOUR B...|536367|
+--------------------+------+
only showing top 10 rows

Working with JSON

可以直接操作JSON字符串或解析JSON文件、提前JSON對象。

val jsonDF = spark.range(1).selectExpr("""‘{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}‘ as jsonString""")

You can use the get_json_object to inline query a JSON object, be it a dictionary or array. You can use json_tuple if this object has only one level of nesting
XPath語法

import org.apache.spark.sql.functions.{get_json_object, json_tuple}
jsonDF.select(
    get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",
    json_tuple(col("jsonString"), "myJSONKey")).show(2)

You can also turn a StructType into a JSON string by using the to_json function。

import org.apache.spark.sql.functions.to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")
  .select(to_json(col("myStruct"))).show(3)

You can use the from_json function to parse this (to_json轉換來的字符串)(or other JSON data) back in. This naturally requires you to specify a schema, and optionally you can specify a map of options, as well.

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val parseSchema = new StructType(Array(
  new StructField("InvoiceNo",StringType,true),
  new StructField("Description",StringType,true)))
df.selectExpr("(InvoiceNo, Description) as myStruct")
  .select(to_json(col("myStruct")).alias("newJSON"))
  .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)

User-Defined Functions

可以用Scala和Python甚至其他庫。
UDFs can take and return one or more columns as input.
By default, these functions are registered as temporary functions to be used in that specific SparkSession or Context.
用UDFs需要考慮性能損失。
第一步:函數

val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)

Thus far, our expectations for the input are high: it must be a specific type and cannot be a null value
第二步:註冊
we need to register them with Spark so that we can use them on all of our worker machines. Spark will serialize the function on the driver and transfer it over the network to all executor processes. This happens regardless of language.
When you use the function, there are essentially two different things that occur. If the function is written in Scala or Java, you can use it within the Java Virtual Machine (JVM). This means that there will be little performance penalty aside from the fact that you can’t take advantage of code generation capabilities that Spark has for built-in functions. There can be performance issues if you create or use a lot of objects。
If the function is written in Python, something quite different happens. Spark starts a Python process on the worker, serializes all of the data to a format that Python can understand (remember, it was in the JVM earlier), executes the function row by row on that data in the Python process, and then finally returns the results of the row operations to the JVM and Spark.。

import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)

警告:推薦用Scala或Java寫UDFs。 Starting this Python process is expensive, but the real cost is in serializing the data to Python. This is costly for two reasons: it is an expensive computation, but also, after the data enters Python, Spark cannot manage the memory of the worker. This means that you could potentially cause a worker to fail if it becomes resource constrained (because both the JVM and Python are competing for memory on the same machine). We recommend that you write your UDFs in Scala or Java—the small amount of time it should take you to write the function in Scala will always yield significant speed ups, and on top of that, you can still use the function from Python!
使用

udfExampleDF.select(power3udf(col("num"))).show()

在字符串表達式中使用UDFs, 把函數註冊為SQL函數

spark.udf.register("power3", power3(_:Double):Double)
udfExampleDF.selectExpr("power3(num)").show(2)

Because this function is registered with Spark SQL—and we’ve learned that any Spark SQL function or expression is valid to use as an expression when working with DataFrames—we can turn around and use the UDF that we wrote in Scala, in Python. However, rather than using it as a DataFrame function, we use it as a SQL expression:
We can also register our Python/Scala function to be available as a SQL function and use that in any language, as well.
One thing we can also do to ensure that our functions are working correctly is specify a return type. As we saw in the beginning of this section, Spark manages its own type information, which does not align exactly with Python’s types. Therefore, it’s a best practice to define the return type for your function when you define it. It is important to note that specifying the return type is not necessary, but it is a best practice.
If you specify the type that doesn’t align with the actual type returned by the function, Spark will not throw an error but will just return null to designate a failure.
When you want to optionally return a value from a UDF, you should return None in Python and an Option type in Scala
you can also use UDF/UDAF creation via a Hive syntax. To allow for this, first you must enable Hive support when they create their SparkSession (via SparkSession.builder().enableHiveSupport()). Then you canregister UDFs in SQL. This is only supported with precompiled Scala and Java packages, so you’ll need to specify them as a dependency.

-- in SQL
CREATE TEMPORARY FUNCTION myFunc AS ‘com.organization.hive.udf.FunctionName‘

Additionally, you can register this as a permanent function in the Hive Metastore by removing TEMPORARY.

Spark:The Definitive Book第六章筆記