1. 程式人生 > >spark:udf函式處理null值

spark:udf函式處理null值

我們有個DataFrame物件df:

df.show
df.printSchema
+----+----+
| str| dou|
+----+----+
|null| 1.0|
|  a |null|
+----+----+
root
|-- str: string (nullable = true)
|-- dou: double (nullable = true)

看以下程式碼:

val udf1 = udf{(e: String) => 
    e.isInstanceOf[String]
}
val udf2 = udf{(e: Double) => 
    e.isInstanceOf[Double]
}
val
udf3 = udf{(e: java.lang.Double) => e.isInstanceOf[Double] } val udf4 = udf{(e: Any) => e.isInstanceOf[Double] } df.withColumn("r1", udf1(df("str"))) .withColumn("r2", udf2(df("dou"))) .withColumn("r3", udf3(df("dou"))) .withColumn("r4", udf3(df("dou"))) .show +----+----+-----+----+-----+-----+ | str| dou| r1| r2| r3| r4| +----+----+-----+----+-----+-----+ |null
| 1.0|false|true| true| true| | a|null| true|null|false|false| +----+----+-----+----+-----+-----+ //提示:null.isInstanceOf[String]和null.isInstanceOf[Double]都返回false

可以看出:
null會被傳入引數型別為String、java.lang.Double和Any的udf函式執行;
不會傳入引數型別為Double的udf函式執行(且會直接返回null)。

如果我們想要在udf函式中處理null值,可以這樣做:

//方法一:
//返回Option包裝型別:Some(A)即為值A; None即為值null
val str_udf = udf((str: String) => str match { case null => Some(0) case s => Some(s.length) }) val dou_udf = udf((dou: Any) => dou match { case d:Double => None case null => Some("is null") }) val res = df.withColumn("str_r", str_udf(df("str"))) .withColumn("dou_r", dou_udf(df("dou"))) res.show res.printSchema +----+----+-----+-------+ | str| dou|str_r| dou_r| +----+----+-----+-------+ |null| 1.0| 0| null| | a|null| 1|is null| +----+----+-----+-------+ root |-- str: string (nullable = true) |-- dou: double (nullable = true) |-- str_r: integer (nullable = true) |-- dou_r: string (nullable = true) //方法二: //使用struct包裝成Row,然後處理(這也是原始碼裡的處理方式) import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ val f = udf{(r:Row) => (r.isNullAt(0), r.isNullAt(1)) } val res = df.withColumn("str_r", f(struct("str", "dou"))) res.show res.printSchema +----+----+------------+ | str| dou| str_r| +----+----+------------+ |null| 1.0|[true,false]| | a|null|[false,true]| +----+----+------------+ root |-- str: string (nullable = true) |-- dou: double (nullable = true) |-- str_r: struct (nullable = true) | |-- _1: boolean (nullable = false) | |-- _2: boolean (nullable = false) //方法三:常規方法,即直接用可以傳入null值的udf函式。後面生成開始df的方法就採用了這種方法

最後給出生成最開始df物件的一個方法(使用方法三):

val d = spark.createDataset(Seq(null, "a")).toDF("str")
val df = d.withColumn("dou", udf{(str:String) =>
    (if(str == null) {
        1.0
    } else {
        null
    }):java.lang.Double
}.apply(d("str")))