1. 程式人生 > 程式設計 >Structured Streaming通過schema_of_json方法動態解析Kafka的JSON資料的Schema

Structured Streaming通過schema_of_json方法動態解析Kafka的JSON資料的Schema

Structured Streaming中如何解析Kafka傳入的JSON資料的Schema

在實際生產中訊息中的欄位可能會發生變化,比如多加一個欄位什麼的,但是Spark程式又不能停下來,所以考慮在程式中不是自定義好Schema,而是通過Kafka輸入訊息中json串來infer Schema。當然,也可以通過廣播變數來更新配置檔案,定期更新Schema,這也是一種寫法

在之前Spark Streaming中解析kafka的json格式資料時,採用的也是Schema infer來推斷,如下

dStream.map(_.value).foreachRDD(rdd=>{
  ...
  val
spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() val df = spark.read.json(spark.createDataSet(rdd)) ... }) 複製程式碼

這樣通過解析json字串,可以直接把json串的key作為DataFrame的Columns列名

但是Structured Streaming中是直接生成DataFrame的,這樣做就不行。翻了下api發現了一個從json字串推斷Schema的方法——schema_of_json

/**
 * Parses a JSON string and infers its schema in DDL format.
 *
 * @param json a JSON string.
 *
 * @group collection_funcs
 * @since 2.4.0
 */
def schema_of_json(json: String): Column = schema_of_json(lit(json)) /** * Parses a JSON string and infers its schema in DDL format. * * @param json a string literal containing a JSON string. * * @group collection_funcs * @since 2.4.0 */ def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson
(json.expr)) 複製程式碼

絕對是第一手資料,我Google了一下這個方法的使用,除了Stack Overflow上有兩個帖子討論,但這個帖子給出的方法還是會報錯

How to query JSON data column using Spark DataFrames?

Implicit schema discovery on a JSON-formatted Spark DataFrame column?

這裡新建了個df測試下

圖片

嘗試使用第二個方法df.select(schema_of_json($"col"))時報錯

scala> df.select(schema_of_json($"col"))
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`col`)' due to data type mismatch: The input json should be a string literal and not null; however,got `col`.;;
複製程式碼

看報錯資訊是需要給一個字串引數,所以做如下嘗試

圖片
也就是取出第一行Row的值,用來作推斷,發現是可以的

最後修改如下

scala> df.select(schema_of_json(df.take(1)(0).get(0).toString).alias("schema")).select(regexp_extract($"schema","struct<(.*?)>",1) as "schema").show(false)

+--------------------------------------------------------------------+
|schema                                                              |
+--------------------------------------------------------------------+
|cardno:string,cardtype:string,flag:string,times:string,userid:string|
|cardno:string,userid:string|
+--------------------------------------------------------------------+
複製程式碼

最終寫法來建立一個Schema

scala> val str = df.select(schema_of_json(df.take(1)(0).get(0).toString).alias("schema")).select(regexp_extract($"schema",1)).take(1)(0).getAs[String](0)
str: String = cardno:string,userid:string

scala> val columns = str.split(",").map(x=>x.split(":")).map(x=>x(0))
columns: Array[String] = Array(cardno,cardtype,flag,times,userid)

scala> var schema = (new StructType)
schema: org.apache.spark.sql.types.StructType = StructType()

scala> columns.map(x=>{schema = schema.add(x,StringType,true)})
res154: Array[Unit] = Array((),(),())

scala> schema
res159: org.apache.spark.sql.types.StructType = StructType(StructField(cardno,true),StructField(cardtype,StructField(flag,StructField(times,StructField(userid,true))

scala> schema.simpleString
res160: String = struct<cardno:string,userid:string>
複製程式碼

就可以用推斷出的Schema來解析json了

df.select(from_json($"col",schema) as "json_value").select("json_value.*")
複製程式碼

但這樣寫還是適合簡單的Schema結構,要是存在巢狀的結構,像Stack Overflow上有個問題,如何定義如下的Schema

struct<abc:struct<name:string>,pqr:struct<address:string>> 
複製程式碼

參考:How to create schema (StructType) with one or more StructTypes?

import org.apache.spark.sql.types._
val name = new StructType().add($"name".string)
scala> println(name.simpleString)
struct<name:string>

val address = new StructType().add($"address".string)
scala> println(address.simpleString)
struct<address:string>

val schema = new StructType().add("abc",name).add("pqr",address)
scala> println(schema.simpleString)
struct<abc:struct<name:string>,pqr:struct<address:string>>

scala> schema.simpleString == "struct<abc:struct<name:string>,pqr:struct<address:string>>"
res4: Boolean = true

scala> schema.printTreeString
root
 |-- abc: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |-- pqr: struct (nullable = true)
 |    |-- address: string (nullable = true)
複製程式碼

這樣的話要寫個專門來解析的方法


其次厚臉皮推薦一個總結歸納的專案吧,面向Linux學習、大資料、機器學習、資料分析、演演算法等,歡迎前往推薦更多資源: Coding-Now

學習記錄的一些筆記,以及所看得一些電子書eBooks、視訊資源和平常收納的一些自己認為比較好的部落格、網站、工具 github.com/josonle/Big…

在這裡插入圖片描述
在這裡插入圖片描述