StructuredStreaming解析JSON
阿新 • • 發佈:2020-08-18
Kafka Producer:
{"createTime":"1532598069","event":{"info":{"AAA":"three","BBB":"four","CCC":"haha"}}}
Kafka Consumer:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types._ object extends App { val spark= SparkSession .builder .appName("DynamicSchema") .master("local[*]") .getOrCreate() val schema = new StructType() .add("createTime", StringType) .add("event", MapType(StringType, new StructType() .add("AAA", StringType, true) .add("BBB", StringType, true) .add("CCC", StringType, true) .add("DDD", StringType, true) )) val parsed = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "dynamic-schema") .option("startingOffsets", "earliest") .load() .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) import spark.implicits._ val event = parsed.select(explode($"parsed_value.event")).select("value.*") val console = event.writeStream .format("console") .outputMode(OutputMode.Append()) val query = console.start() query.awaitTermination() }
StructuredStreaming輸出結果:
+-----+----+----+----+
| AAA| BBB| CCC| DDD|
+-----+----+----+----+
|three|four|haha|null|
+-----+----+----+----+
連結:https://www.dazhuanlan.com/2019/11/20/5dd51853cd505/