SparkStreaming 解析Kafka JSON格式資料
SparkStreaming 解析Kafka JSON格式資料
專案記錄:在專案中,SparkStreaming整合Kafka時,通常Kafka傳送的資料是以JSON字串形式傳送的,這裡總結了五種SparkStreaming解析Kafka中JSON格式資料並轉為DataFrame進行資料分析的方法。
需求:將如下JSON格式的資料
轉成如下所示的DataFrame
1 使用Python指令碼建立造數器
隨機生成如上圖所示的JSON格式的資料,並將它傳送到Kafka。造數器指令碼程式碼如下所示:
kafka_data_generator.py
"""
造數器:向kafka傳送json格式資料
資料格式如下所示:
{
"namespace":"000001",
"region":"Beijing",
"id":"9d58f83e-fb3b-45d8-b7e4-13d33b0dd832",
"valueType":"Float",
"value":"48.5",
"time":"2018-11-05 15:04:47"
}
"""
import uuid
import time
import random
from pykafka import KafkaClient
import json
sample_type = ['Float', 'String', 'Int']
sample_namespace = ['000000', '000001', '000002']
sample_region = ['Beijing', 'Shanghai', 'Jinan', 'Qingdao', 'Yantai', 'Hangzhou']
sample_id_info = [
{'3f7e7feb-fce6-4421-8321-3ac7c712f57a': {'valueType': 'Float', 'region': 'Shanghai', 'namespace': '000001'}},
{'42f3937e-301c-489e-976b-d18f47df626f' : {'valueType': 'Float', 'region': 'Beijing', 'namespace': '000000'}},
{'d61e5ac7-4357-4d48-a6d9-3e070927f087': {'valueType': 'Int', 'region': 'Beijing', 'namespace': '000000'}},
{'ddfca6fe-baf5-4853-8463-465ddf8234b4': {'valueType': 'String', 'region': 'Hangzhou', 'namespace': '000001'}},
{'15f7ef13-2100-464c-84d7-ce99d494f702': {'valueType': 'Int', 'region': 'Qingdao', 'namespace': '000001'}},
{'abb43869-dd0b-4f43-ab9d-e4682cb9c844': {'valueType': 'Int', 'region': 'Beijing', 'namespace': '000000'}},
{'b63c1a92-c76c-4db3-a8ac-66d67c9dc6e6': {'valueType': 'Int', 'region': 'Yantai', 'namespace': '000001'}},
{'0cf781ae-8202-4986-8df5-7ca0b21c094e': {'valueType': 'String', 'region': 'Yantai', 'namespace': '000002'}},
{'42073ecd-0f23-49d6-a8ba-a8cbee6446e3': {'valueType': 'Float', 'region': 'Beijing', 'namespace': '000000'}},
{'bd1fc887-d980-4488-8b03-2254165da582': {'valueType': 'String', 'region': 'Shanghai', 'namespace': '000000'}},
{'eec90363-48bc-44b7-90dd-f79288d34f39': {'valueType': 'String', 'region': 'Shanghai', 'namespace': '000002'}},
{'fb15d27f-d2e3-4048-85b8-64f4faa526d1': {'valueType': 'Float', 'region': 'Jinan', 'namespace': '000001'}},
{'c5a623fd-d67b-4d83-8b42-3345352b8db9': {'valueType': 'String', 'region': 'Qingdao', 'namespace': '000001'}},
{'fee3ecb2-dd1a-4421-a8bd-cf8bc6648320': {'valueType': 'Float', 'region': 'Yantai', 'namespace': '000001'}},
{'e62818ab-a42a-4342-be31-ba46e0ae7720': {'valueType': 'Float', 'region': 'Qingdao', 'namespace': '000001'}},
{'83be5bdc-737c-4616-a576-a15a2c1a1684': {'valueType': 'String', 'region': 'Hangzhou', 'namespace': '000001'}},
{'14dcd861-14eb-40f3-a556-e52013646e6d': {'valueType': 'String', 'region': 'Beijing', 'namespace': '000002'}},
{'8117826d-4842-4907-b6eb-446fead74244': {'valueType': 'String', 'region': 'Beijing', 'namespace': '000001'}},
{'fb23b254-a873-4fba-a17d-73fdccbfe768': {'valueType': 'Int', 'region': 'Yantai', 'namespace': '000000'}},
{'0685c868-2f74-4f91-a531-772796b1c8a4': {'valueType': 'String', 'region': 'Shanghai', 'namespace': '000001'}}]
def generate_id_info(amount=20):
"""
生成id 資訊,只執行一次
:return:
[{
"id":{
"type":"Int",
"region":"Hangzhou"
}
}]
"""
return [{str(uuid.uuid4()): {"valueType": random.sample(sample_type, 1)[0],
"region": random.sample(sample_region, 1)[0],
"namespace": random.sample(sample_namespace, 1)[0]
}} for i in range(amount)]
def random_value(value_type):
value = "this is string value"
if value_type == "Float":
value = random.uniform(1, 100)
if value_type == "Int":
value = random.randint(1, 100)
return value
def generate_data(id_info):
data = dict()
for _id, info in id_info.items():
data = {"id": _id,
"value": random_value(info['valueType']),
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
}
data.update(info)
return data
def random_data():
return generate_data(random.sample(sample_id_info, 1)[0])
if __name__ == '__main__':
client = KafkaClient(hosts="localhost:9092", zookeeper_hosts="localhost:2181")
topic = client.topics[b"spark_streaming_kafka_json"]
with topic.get_sync_producer() as producer:
for i in range(1000):
_random_data = json.dumps(random_data())
producer.produce(bytes(_random_data, encoding="utf-8"))
time.sleep(1)
檢視kafka topic 中是否包含資料:
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic spark_streaming_kafka_json --from-beginning
2 Spark Streaming 處理JSON格式資料
2.1 方法一:處理JSON字串為case class 生成RDD[case class] 然後直接轉成DataFrame
思路:Spark Streaming從Kafka讀到資料後,先通過自定義的handleMessage2CaseClass方法進行一次轉換,將JSON字串轉換成指定格式的case class:[KafkaMessage],然後通過foreachRDD拿到RDD[KafkaMessage]型別的的rdd,最後直接通過spark.createDataFrame(RDD[KafkaMessage])。思路來源如下圖所示:
核心程式碼:
/**
* 方法一:處理JSON字串為case class 生成RDD[case class] 然後直接轉成DataFrame
*/
stream.map(record => handleMessage2CaseClass(record.value())).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.createDataFrame(rdd)
df.show()
})
handleMessage2CaseClass方法:
def handleMessage2CaseClass(jsonStr: String): KafkaMessage = {
val gson = new Gson()
gson.fromJson(jsonStr, classOf[KafkaMessage])
}
Case Class:
case class KafkaMessage(time: String, namespace: String, id: String, region: String, value: String, valueType: String)
依賴:
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
2.2 方法二:處理JSON字串為Tuple 生成RDD[Tuple] 然後轉成DataFrame
思路:此方法的思路與方法一的思路相同,只不過不轉為Case Class 而是轉為Tuple,思路來源如下圖所示:
核心程式碼:
/**
* 方法二:處理JSON字串為Tuple 生成RDD[Tuple] 然後轉成DataFrame
*/
stream.map(record => handleMessage2Tuples(record.value())).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = rdd.toDF("id", "value", "time", "valueType", "region", "namespace")
df.show()
})
handleMessage2Tuples方法:
def handleMessage2Tuples(jsonStr: String): (String, String, String, String, String, String) = {
import scala.collection.JavaConverters._
val list = JSON.parseObject(jsonStr, classOf[JLinkedHashMap[String, Object]]).asScala.values.map(x => String.valueOf(x)).toList
list match {
case List(v1, v2, v3, v4, v5, v6) => (v1, v2, v3, v4, v5, v6)
}
}
2.3 方法三:處理JSON字串為Row 生成RDD[Row] 然後通過schema建立DataFrame
思路:SparkStreaming從kafka讀到資料之後,先通過handlerMessage2Row自定義的方法,將JSON字串轉成Row型別,然後通過foreachRDD拿到RDD[Row]型別的RDD,最後通過Spark.createDataFrame(RDD[Row],Schema)生成DataFrame,思路來源:
核心程式碼:
/**
* 方法三:處理JSON字串為Row 生成RDD[Row] 然後通過schema建立DataFrame
*/
val schema = StructType(List(
StructField("id", StringType),
StructField("value", StringType),
StructField("time", StringType),
StructField("valueType", StringType),
StructField("region", StringType),
StructField("namespace", StringType))
)
stream.map(record => handlerMessage2Row(record.value())).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.createDataFrame(rdd, schema)
df.show()
})
handlerMessage2Row方法:
def handlerMessage2Row(jsonStr: String): Row = {
import scala.collection.JavaConverters._
val array = JSON.parseObject(jsonStr, classOf[JLinkedHashMap[String, Object]]).asScala.values.map(x => String.valueOf(x)).toArray
Row(array: _*)
}
2.4 方法四:直接將 RDD[String] 轉成DataSet 然後通過schema轉換
思路:直接通過foreachRDD拿到RDD[String]型別的RDD,然後通過spark.createDataSet(RDD[String])方法生成只含有一列value列的DataSet,然後通過Spark SQL 內建函式 from_json格式化json字串,然後取每一列的值生成DataFrame。思路來源:
核心程式碼:
/**
* 方法四:直接將 RDD[String] 轉成DataSet 然後通過schema轉換
*/
val schema = StructType(List(
StructField("namespace", StringType),
StructField("id", StringType),
StructField("region", StringType),
StructField("time", StringType),
StructField("value", StringType),
StructField("valueType", StringType))
)
stream.map(record => record.value()).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val ds = spark.createDataset(rdd)
ds.select(from_json('value.cast("string"), schema) as "value").select($"value.*").show()
})
2.5 方法五:直接將 RDD[String] 轉成DataSet 然後通過read.json轉成DataFrame
思路:直接通過foreachRDD拿到RDD[String]型別的RDD,然後通過spark.createDataSet建立DataSet,最後通過spark.read.json(DataSet[String])方法來建立DataFrame。此方法程式碼量最小,不需要指定schema,不需要進行json轉換。思路來源:
核心程式碼:
/**
* 方法五:直接將 RDD[String] 轉成DataSet 然後通過read.json轉成DataFrame
*/
stream.map(record => record.value()).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
df.show()
})
3 對生成的DataFrame進行分析
通過上面方法我們已經可以拿到一個如期所欲的DataFrame了,接下來就是使用Spark SQL 對資料進行分析處理。
3.1 需求1:將time列的時間由原來的2018-11-07 17:08:43字串格式,轉成:yyyyMMdd這種格式,生成新的列,並命名為day列。
實現程式碼:
import org.apache.spark.sql.functions._
import spark.implicits._
df.select(date_format($"time".cast(DateType), "yyyyMMdd").as("day"), $"*").show()
結果:
3.2 需求2:按照Day列和namespae列進行分割槽,並儲存到檔案。
實現程式碼:
df.write.mode(SaveMode.Append)
.partitionBy("namespace", "time")
.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/Streaming")
結果:
4 一些思考?
4.1 思考1:如果json格式為[]陣列該如何處理?
上面我們處理的json字串都是{}都是物件格式的,那麼如果Kafka裡的資料是以[]陣列字串的格式儲存的,那麼我們該如何處理呢?
這裡暫且提供兩種方法:
4.1.1 第一種:通過handleMessage自定義方法處理JSON字串為Array[case class],然後通過flatmap展開,再通過foreachRDD拿到RDD[case class]格式的RDD,最後直接轉成DataFrame。
handleMessage方法:
def handleMessage(jsonStr: String): Array[KafkaMessage] = {
val gson = new Gson()
gson.fromJson(jsonStr, classOf[Array[KafkaMessage]])
}
核心程式碼:
/**
* 補充:處理[]陣列格式的json字串,方法一:通過handleMessage自定義方法處理JSON字串為Array[case class],
* 然後通過flatmap展開,再通過foreachRDD拿到RDD[case class]格式的RDD,最後直接轉成DataFrame。
*/
stream.map(record => handleMessage(record.value())).flatMap(_).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.createDataFrame(rdd)
df.show()
})
4.1.2 第二種:直接處理RDD[String],建立DataSet,然後通過Spark SQL 內建函式from_json和指定的schema格式化json資料,然後再通過內建函式explode展開陣列格式的json資料,最後通過select json中的每一個key,獲得最終的DataFrame
核心程式碼:
/**
* 補充:處理[]陣列格式的json字串,方法二:第二種:直接處理RDD[String],建立DataSet,
* 然後通過Spark SQL 內建函式from_json和指定的schema格式化json資料,
* 再通過內建函式explode展開陣列格式的json資料,最後通過select json中的每一個key,獲得最終的DataFrame
*/
val schema = StructType(List(
StructField("namespace", StringType),
StructField("id", StringType),
StructField("region", StringType),
StructField("time", StringType),
StructField("value", StringType),
StructField("valueType", StringType))
)
stream.map(record => record.value()).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val ds = spark.createDataset(rdd)
import org.apache.spark.sql.functions._
val df = ds.select(from_json('value, ArrayType(schema)) as "value").select(explode('value)).select($"col.*")
df.show()
})
4.2 思考2:如果使用StructStreaming該如何處理json資料?
StructStreaming是一個結構式流,實際拿到的就是一個DataFrame,所以可以使用上面的第四種方法來解析json資料。
package com.hollysys.spark.streaming.kafkajson
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{date_format, from_json, struct}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
/**
* Created by shirukai on 2018/11/8
* 使用Struct Streaming 處理 kafka中json格式的資料
*/
object HandleJSONDataByStructStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName(this.getClass.getSimpleName)
.master("local[2]")
.getOrCreate()
val source = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "spark_streaming_kafka_json")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
import spark.implicits._
val schema = StructType(List(
StructField("id", StringType),
StructField("value", StringType),
StructField("time", StringType),
StructField("valueType", StringType),
StructField("region", StringType),
StructField("namespace", StringType))
)
val data = source.select(from_json('value.cast("string"), schema) as "value").select($"value.*")
.select(date_format($"time".cast(DateType), "yyyyMMdd").as("day"), $"*")
val query = data
.writeStream
.format("parquet")
.outputMode("Append")
.option("checkpointLocation", "/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/checkpoint")
.option("path", "/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/structstreaming")
.trigger(Trigger.ProcessingTime(3000)).partitionBy("namespace", "day")
.start()
query.awaitTermination()
}
}
結果:
完整程式碼:
package com.hollysys.spark.streaming.kafkajson
import com.alibaba.fastjson.JSON
import com.google.gson.Gson
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.util.{LinkedHashMap => JLinkedHashMap}
/**
* Created by shirukai on 2018/11/7
* Spark Streaming 處理 kafka json格式資料,並轉成DataFrame
*/
object JSONDataHandler {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("JSONDataHandler")
val ssc = new StreamingContext(conf, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_streaming",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark_streaming_kafka_json")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
/**
* 方法一:處理JSON字串為case class 生成RDD[case class] 然後直接轉成DataFrame
*/
stream.map(record => handleMessage2CaseClass(record.value())).foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.createDataFrame(rdd)
import org.apache.spark.sql.functions._
import spark.implicits._
df.select(date_format($"time".cast(DateType), "yyyyMMdd").as("day"), $"*")
.write.mode(SaveMode.Append)
.partitionBy("namespace", "day")
.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/Streaming")
})
/**
* 方法二:處理JSON字串為Tuple 生成RDD[Tuple] 然後轉成DataFrame
*/
// stream.map(record => handleMessage2Tuples(record.value())).foreachRDD(rdd => {
// val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
// import spark.implicits._
// val df = rdd.toDF("id", "value", "time", "valueType", "region", "namespace")
// df.show()
// })
/**
* 方法三:處理JSON字串為Row 生成RDD[Row] 然後通過schema建立DataFrame
*/
// val schema = StructType(List(
// StructField("id", StringType),
// StructField("value", StringType),
// StructField("time", StringType),
// StructField("valueType", StringType),
// StructField("region", StringType),
// StructField("namespace", StringType))
// )
// stream.map(record => handlerMessage2Row(record.value())).foreachRDD(rdd => {
// val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
// val df = spark.createDataFrame(rdd, schema)
// df.show()
// })
/**
* 方法四:直接將 RDD[String] 轉成DataSet 然後通過schema轉換
*/
// val schema = StructType(List(
// StructField("namespace", StringType),
// StructField("id", StringType),
// StructField("region", StringType),
// StructField("time", StringType),
// StructField("value", StringType),
// StructField("valueType", StringType))
// )
// stream.map(record => record.value()).foreachRDD(rdd => {
// val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
// import spark.implicits._
// val ds = spark.createDataset(rdd)
// ds.select(from_json('value.cast("string"), schema) as "value").select($"value.*").show()
// })
/**
* 方法五:直接將 RDD[String] 轉成DataSet 然後通過read.json轉成DataFrame
*/
// stream.map(record => record.value()).foreachRDD(rdd => {
// val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
// import spark.implicits._
// val df = spark.read.json(spark.createDataset(rdd))
// df.show()
// })
/**
* 補充:處理[]陣列格式的json字串,方法一:通過handleMessage自定義方法處理JSON字串為Array[case class],
* 然後通過flatmap展開,再通過foreachRDD拿到RDD[case class]格式的RDD,最後直接轉成DataFrame。
*/
// stream.map(record => handleMessage(record.value())).flatMap(_).foreachRDD(rdd => {
// val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
// val df = spark.createDataFrame(rdd)
// df.show()
// })
/**
* 補充:處理[]陣列格式的json字串,方法二:第二種:直接處理RDD[String],建立DataSet,
* 然後通過Spark SQL 內建函式from_json和指定的schema格式化json資料,
* 再通過內建函式explode展開陣列格式的json資料,最後通過select json中的每一個key,獲得最終的DataFrame
*/
// val schema = StructType(List(
// StructField("namespace", StringType),
// StructField("id", StringType),
// StructField("region", StringType),
// StructField("time", StringType),
// StructField("value", StringType),
// StructField("valueType", StringType))
// )
// stream.map(record => record.value()).foreachRDD(rdd => {
// val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
// import spark.implicits._
// val ds = spark.createDataset(rdd)
// import org.apache.spark.sql.functions._
// val df = ds.select(from_json('value, ArrayType(schema)) as "value").select(explode('value)).select($"col.*")
// df.show()
// })
ssc.start()
ssc.awaitTermination()
}
def handleMessage(jsonStr: String): Array[KafkaMessage] = {
val gson = new Gson()
gson.fromJson(jsonStr, classOf[Array[KafkaMessage]])
}
def handleMessage2CaseClass(jsonStr: String): KafkaMessage = {
val gson = new Gson()
gson.fromJson(jsonStr, classOf[KafkaMessage])
}
def handleMessage2Tuples(jsonStr: String): (String, String, String, String, String, String) = {
import scala.collection.JavaConverters._
val list = JSON.parseObject(jsonStr, classOf[JLinkedHashMap[String, Object]]).asScala.values.map(x => String.valueOf(x)).toList
list match {
case List(v1, v2, v3, v4, v5, v6) => (v1, v2, v3, v4, v5, v6)
}
}
def handlerMessage2Row(jsonStr: String): Row = {
import scala.collection.JavaConverters._
val array = JSON.parseObject(jsonStr, classOf[JLinkedHashMap[String, Object]]).asScala.values.map(x => String.valueOf(x)).toArray
Row(array: _*)
}
}
case class KafkaMessage(time: String, namespace: String, id: String, region: String, value: String, valueType: String)
總結
目前只想到了上面五種方法,如果有其它思路後續會補上。對比這五種方法,不考慮效能問題,從程式碼量和靈活度來看,第五種方法是比較好的,因為不需要我們指定schema資訊。其次是第一種,不過需要事先定義好case class。另外,在上面的前三種方法中,我們都用到了將json轉換成不同物件的方法,但是第一種用的是谷歌的gson後兩種用的是阿里的fastjson。是因為,建立DataFrame的時候只支援case class,而當我們使用fastjson的JSON.pares(jsonStr,classOf[KafkaMessage])時會報錯,因為fastjson無法將json字串轉成case class物件。所以這裡選用的gson。