1. 程式人生 > 實用技巧 >StructuredStreaming解析JSON

StructuredStreaming解析JSON

Kafka Producer:

{"createTime":"1532598069","event":{"info":{"AAA":"three","BBB":"four","CCC":"haha"}}}

Kafka Consumer:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._


object  extends App {
  val spark 
= SparkSession .builder .appName("DynamicSchema") .master("local[*]") .getOrCreate() val schema = new StructType() .add("createTime", StringType) .add("event", MapType(StringType, new StructType() .add("AAA", StringType, true) .add("BBB", StringType, true) .add(
"CCC", StringType, true) .add("DDD", StringType, true) )) val parsed = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "dynamic-schema") .option("startingOffsets", "earliest") .load() .select(from_json(col("
value").cast("string"), schema).alias("parsed_value")) import spark.implicits._ val event = parsed.select(explode($"parsed_value.event")).select("value.*") val console = event.writeStream .format("console") .outputMode(OutputMode.Append()) val query = console.start() query.awaitTermination() }

StructuredStreaming輸出結果:

+-----+----+----+----+
| AAA| BBB| CCC| DDD|
+-----+----+----+----+
|three|four|haha|null|
+-----+----+----+----+

連結:https://www.dazhuanlan.com/2019/11/20/5dd51853cd505/