spark:udf函式處理null值
阿新 • • 發佈:2019-02-18
我們有個DataFrame物件df:
df.show
df.printSchema
+----+----+
| str| dou|
+----+----+
|null| 1.0|
| a |null|
+----+----+
root
|-- str: string (nullable = true)
|-- dou: double (nullable = true)
看以下程式碼:
val udf1 = udf{(e: String) =>
e.isInstanceOf[String]
}
val udf2 = udf{(e: Double) =>
e.isInstanceOf[Double]
}
val udf3 = udf{(e: java.lang.Double) =>
e.isInstanceOf[Double]
}
val udf4 = udf{(e: Any) =>
e.isInstanceOf[Double]
}
df.withColumn("r1", udf1(df("str")))
.withColumn("r2", udf2(df("dou")))
.withColumn("r3", udf3(df("dou")))
.withColumn("r4", udf3(df("dou")))
.show
+----+----+-----+----+-----+-----+
| str| dou| r1| r2| r3| r4|
+----+----+-----+----+-----+-----+
|null | 1.0|false|true| true| true|
| a|null| true|null|false|false|
+----+----+-----+----+-----+-----+
//提示:null.isInstanceOf[String]和null.isInstanceOf[Double]都返回false
可以看出:
null會被傳入引數型別為String、java.lang.Double和Any的udf函式執行;
不會傳入引數型別為Double的udf函式執行(且會直接返回null)。
如果我們想要在udf函式中處理null值,可以這樣做:
//方法一:
//返回Option包裝型別:Some(A)即為值A; None即為值null
val str_udf = udf((str: String) => str match {
case null => Some(0)
case s => Some(s.length)
})
val dou_udf = udf((dou: Any) => dou match {
case d:Double => None
case null => Some("is null")
})
val res = df.withColumn("str_r", str_udf(df("str")))
.withColumn("dou_r", dou_udf(df("dou")))
res.show
res.printSchema
+----+----+-----+-------+
| str| dou|str_r| dou_r|
+----+----+-----+-------+
|null| 1.0| 0| null|
| a|null| 1|is null|
+----+----+-----+-------+
root
|-- str: string (nullable = true)
|-- dou: double (nullable = true)
|-- str_r: integer (nullable = true)
|-- dou_r: string (nullable = true)
//方法二:
//使用struct包裝成Row,然後處理(這也是原始碼裡的處理方式)
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
val f = udf{(r:Row) =>
(r.isNullAt(0), r.isNullAt(1))
}
val res = df.withColumn("str_r", f(struct("str", "dou")))
res.show
res.printSchema
+----+----+------------+
| str| dou| str_r|
+----+----+------------+
|null| 1.0|[true,false]|
| a|null|[false,true]|
+----+----+------------+
root
|-- str: string (nullable = true)
|-- dou: double (nullable = true)
|-- str_r: struct (nullable = true)
| |-- _1: boolean (nullable = false)
| |-- _2: boolean (nullable = false)
//方法三:常規方法,即直接用可以傳入null值的udf函式。後面生成開始df的方法就採用了這種方法
最後給出生成最開始df物件的一個方法(使用方法三):
val d = spark.createDataset(Seq(null, "a")).toDF("str")
val df = d.withColumn("dou", udf{(str:String) =>
(if(str == null) {
1.0
} else {
null
}):java.lang.Double
}.apply(d("str")))