Spark中json字串和DataFrame相互轉換
本文介紹基於Spark(2.0+)的Json字串和DataFrame相互轉換。
json字串轉DataFrame
spark提供了將json字串解析為DF的介面,如果不指定生成的DF的schema,預設spark會先掃碼一遍給的json字串,然後推斷生成DF的schema:
- 若列資料全為null會用String型別
- 整數預設會用Long型別
- 浮點數預設會用Double型別
val json1 = """{"a":null, "b": 23.1, "c": 1}"""
val json2 = """{"a":null, "b": "hello", "d": 1.2}"""
val ds = spark.createDataset(Seq(json1, json2))
val df = spark.read.json(ds)
df.show
df.printSchema
+----+-----+----+----+
| a| b| c| d|
+----+-----+----+----+
|null| 23.1| 1|null|
|null|hello|null| 1.2|
+----+-----+----+----+
root
|-- a: string (nullable = true)
|-- b: string (nullable = true )
|-- c: long (nullable = true)
|-- d: double (nullable = true)
若指定schema會按照schema生成DF:
- schema中不存在的列會被忽略
- 可以用兩種方法指定schema,StructType和String,具體對應關係看後面
- 若資料無法匹配schema中型別:若schema中列允許為null會轉為null;若不允許為null會轉為相應型別的空值(如Double型別為0.0值),若無法轉換為值會丟擲異常
val schema = StructType(List(
StructField("a", ByteType, true ),
StructField("b", FloatType, false),
StructField("c", ShortType, true)
))
//或 val schema = "b float, c short"
val df = spark.read.schema(schema).json(ds)
df.show
df.printSchema
+----+----+----+
| a| b| c|
+----+----+----+
|null|23.1| 1|
|null| 0|null|
+----+----+----+
root
|-- a: byte (nullable = true)
|-- b: float (nullable = true)
|-- c: short (nullable = true)
json解析相關配置引數
primitivesAsString
(default false): 把所有列看作string型別
prefersDecimal
(default false): 將小數看作decimal,如果不匹配decimal,就看做doubles.
allowComments
(default false): 忽略json字串中Java/C++風格的註釋
allowUnquotedFieldNames
(default false): 允許不加引號的列名
allowSingleQuotes
(default true): 除雙引號外,還允許用單引號
allowNumericLeadingZeros
(default false): 允許數字中額外的前導0(如0012)
allowBackslashEscapingAnyCharacter
(default false): 允許反斜槓機制接受所有字元
allowUnquotedControlChars
(default false): 允許JSON字串包含未加引號的控制字元(值小於32的ASCII字元,包括製表符和換行字元)。
mode
(default PERMISSIVE): 允許在解析期間處理損壞記錄的模式。
PERMISSIVE
:當遇到損壞的記錄時,將其他欄位設定為null,並將格式錯誤的字串放入由columnNameOfCorruptRecord配置的欄位中。若指定schema,在schema中設定名為columnNameOfCorruptRecord的字串型別欄位。 如果schema中不具有該欄位,則會在分析過程中刪除損壞的記錄。若不指定schema(推斷模式),它會在輸出模式中隱式新增一個columnNameOfCorruptRecord欄位。
DROPMALFORMED
: 忽略整條損害記錄
FAILFAST
: 遇到損壞記錄throws an exception
columnNameOfCorruptRecord
(預設值為spark.sql.columnNameOfCorruptRecord的值):允許PERMISSIVE mode新增的新欄位,會重寫spark.sql.columnNameOfCorruptRecord
dateFormat
(default yyyy-MM-dd): 自定義日期格式,遵循java.text.SimpleDateFormat格式. 只有日期部分(無詳細時間)
timestampFormat
(default yyyy-MM-dd’T’HH:mm:ss.SSSXXX): 自定義日期格式,遵循java.text.SimpleDateFormat格式. 可以有詳細時間部分(到微秒)
multiLine
(default false): 解析一個記錄,該記錄可能跨越多行,每個檔案
以上引數可用option方法配置:
val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show
stringDF.printSchema
+----+-----+----+----+
| a| b| c| d|
+----+-----+----+----+
|null| 23.1| 1|null|
|null|hello|null| 1.2|
+----+-----+----+----+
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- c: string (nullable = true)
|-- d: string (nullable = true)
二進位制型別會自動用base64編碼方式表示
‘Man’(ascci) base64編碼後為:”TWFu”
val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte)
val binaryDs = spark.createDataset(Seq(byteArr))
val dsWithB64 = binaryDs.withColumn("b64", base64(col("value")))
dsWithB64.show(false)
dsWithB64.printSchema
+----------+----+
|value |b64 |
+----------+----+
|[4D 61 6E]|TWFu|
+----------+----+
root
|-- value: binary (nullable = true)
|-- b64: string (nullable = true)
//=================================================
dsWithB64.toJSON.show(false)
+-----------------------------+
|value |
+-----------------------------+
|{"value":"TWFu","b64":"TWFu"}|
+-----------------------------+
//=================================================
val json = """{"value":"TWFu"}"""
val jsonDs = spark.createDataset(Seq(json))
val binaryDF = spark.read.schema("value binary").json(jsonDs )
binaryDF.show
binaryDF.printSchema
+----------+
| value|
+----------+
|[4D 61 6E]|
+----------+
root
|-- value: binary (nullable = true)
指定schema示例:
以下是Spark SQL支援的所有基本型別:
val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3, "floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23, "binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12 11:22:22.123123"}"""
val ds = spark.createDataset(Seq(json))
val schema = "stringc string, shortc short, integerc int, longc long, floatc float, doublec double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary, datec date, timestampc timestamp"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc |datec |timestampc |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
root
|-- stringc: string (nullable = true)
|-- shortc: short (nullable = true)
|-- integerc: integer (nullable = true)
|-- longc: long (nullable = true)
|-- floatc: float (nullable = true)
|-- doublec: double (nullable = true)
|-- decimalc: decimal(10,3) (nullable = true)
|-- booleanc: boolean (nullable = true)
|-- bytec: byte (nullable = true)
|-- binaryc: binary (nullable = true)
|-- datec: date (nullable = true)
|-- timestampc: timestamp (nullable = true)
複合型別:
val json = """
{
"arrayc" : [ 1, 2, 3 ],
"structc" : {
"strc" : "efg",
"decimalc" : 1.1
},
"mapc" : {
"key1" : 1.2,
"key2" : 1.1
}
}
"""
val ds = spark.createDataset(Seq(json))
val schema = "arrayc array<short>, structc struct<strc:string, decimalc:decimal>, mapc map<string, float>"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema
+---------+--------+--------------------------+
|arrayc |structc |mapc |
+---------+--------+--------------------------+
|[1, 2, 3]|[efg, 1]|[key1 -> 1.2, key2 -> 1.1]|
+---------+--------+--------------------------+
root
|-- arrayc: array (nullable = true)
| |-- element: short (containsNull = true)
|-- structc: struct (nullable = true)
| |-- strc: string (nullable = true)
| |-- decimalc: decimal(10,0) (nullable = true)
|-- mapc: map (nullable = true)
| |-- key: string
| |-- value: float (valueContainsNull = true)
SparkSQL資料型別
基本型別:
DataType | simpleString | typeName | sql | defaultSize | catalogString | json |
---|---|---|---|---|---|---|
StringType | string | string | STRING | 20 | string | “string” |
ShortType | smallint | short | SMALLINT | 2 | smallint | “short” |
IntegerType | int | integer | INT | 4 | int | “integer” |
LongType | bigint | long | BIGINT | 8 | bigint | “long” |
FloatType | float | float | FLOAT | 4 | float | “float” |
DoubleType | double | double | DOUBLE | 8 | double | “double” |
DecimalType(10,3) | decimal(10,3) | decimal(10,3) | DECIMAL(10,3) | 8 | decimal(10,3) | “decimal(10,3)” |
BooleanType | boolean | boolean | BOOLEAN | 1 | boolean | “boolean” |
ByteType | tinyint | byte | TINYINT | 1 | tinyint | “byte” |
BinaryType | binary | binary | BINARY | 100 | binary | “binary” |
DateType | date | date | DATE | 4 | date | “date” |
TimestampType | timestamp | timestamp | TIMESTAMP | 8 | timestamp | “timestamp” |
三個複合型別:
DataType | simpleString | typeName | sql | defaultSize | catalogString | json |
---|---|---|---|---|---|---|
ArrayType(IntegerType, true) | array<int> | array | ARRAY<INT> | 4 | array<int> | {“type”:”array”,”elementType”:”integer”,”containsNull”:true} |
MapType(StringType, LongType, true) | map<string,bigint> | map | MAP<STRING, BIGINT> | 28 | map<string,bigint> | {“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true} |
StructType(StructField(“sf”, DoubleType)::Nil) | struct<sf:double> | struct | STRUCT<`sf`: DOUBLE> | 8 | struct<sf:double> | {“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}}]} |