Spark DataFrame列的合並與拆分
阿新 • • 發佈:2018-10-19
返回 創建 main 3.0 substr tom 獲取 clas font
版本說明:Spark-2.3.0
使用Spark SQL在對數據進行處理的過程中,可能會遇到對一列數據拆分為多列,或者把多列數據合並為一列。這裏記錄一下目前想到的對DataFrame列數據進行合並和拆分的幾種方法。
1 DataFrame列數據的合並
例如:我們有如下數據,想要將三列數據合並為一列,並以“,”分割
+----+---+-----------+ |name|age| phone| +----+---+-----------+ |Ming| 20|15552211521| |hong| 19|13287994007| | zhi| 21|15552211523| +----+---+-----------+
1.1 使用map方法重寫
使用map方法重寫就是將DataFrame使用map取值之後,然後使用toSeq方法轉成Seq格式,最後使用Seq的foldLeft方法拼接數據,並返回,如下所示:
//方法1:利用map重寫
val separator = ","
df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()
/**
* +-------------------+
* | value|
* +-------------------+
* |Ming,20,15552211521|
* |hong,19,13287994007|
* | zhi,21,15552211523|
* +-------------------+
*/
1.2 使用內置函數concat_ws
合並多列數據也可以使用SparkSQL的內置函數concat_ws()
//方法2: 使用內置函數 concat_ws
import org.apache.spark.sql.functions._
df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()
/**
* +-------------------+
* | value|
* +-------------------+
* |Ming,20,15552211521|
* |hong,19,13287994007|
* | zhi,21,15552211523|
* +-------------------+
*/
1.3 使用自定義UDF函數
自己編寫UDF函數,實現多列合並
//方法3:使用自定義UDF函數
// 編寫udf函數
def mergeCols(row: Row): String = {
row.toSeq.foldLeft("")(_ + separator + _).substring(1)
}
val mergeColsUDF = udf(mergeCols _)
df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()
完整代碼:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StringType
/**
* Created by shirukai on 2018/9/12
* DataFrame 合並列
*/
object MergeColsTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName(this.getClass.getSimpleName)
.master("local")
.getOrCreate()
//從內存創建一組DataFrame數據
import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L))
.toDF("name", "age", "phone")
df.show()
/**
* +----+---+-----------+
* |name|age| phone|
* +----+---+-----------+
* |Ming| 20|15552211521|
* |hong| 19|13287994007|
* | zhi| 21|15552211523|
* +----+---+-----------+
*/
//方法1:利用map重寫
val separator = ","
df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()
/**
* +-------------------+
* | value|
* +-------------------+
* |Ming,20,15552211521|
* |hong,19,13287994007|
* | zhi,21,15552211523|
* +-------------------+
*/
//方法2: 使用內置函數 concat_ws
import org.apache.spark.sql.functions._
df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()
/**
* +-------------------+
* | value|
* +-------------------+
* |Ming,20,15552211521|
* |hong,19,13287994007|
* | zhi,21,15552211523|
* +-------------------+
*/
//方法3:使用自定義UDF函數
// 編寫udf函數
def mergeCols(row: Row): String = {
row.toSeq.foldLeft("")(_ + separator + _).substring(1)
}
val mergeColsUDF = udf(mergeCols _)
df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()
/**
* /**
* * +-------------------+
* * | value|
* * +-------------------+
* * |Ming,20,15552211521|
* * |hong,19,13287994007|
* * | zhi,21,15552211523|
* * +-------------------+
**/
*/
}
}
2 DataFrame列數據的拆分
上面我們將DataFrame的多列數據合並為一列如下所示,有時候我們也需要將單列數據,以某種拆分規則,拆分為多列。下面提供幾種將一列拆分為多列的方法。
+-------------------+
| value|
+-------------------+
|Ming,20,15552211521|
|hong,19,13287994007|
| zhi,21,15552211523|
+-------------------+
2.1 使用內置函數split,然後遍歷添加列
該方法,先利用內置函數split將單列的數據拆分,然後遍歷使用getItem(角標)方法獲取拆分後的數據,依次使用withColumn方法添加新列,代碼如下所示:
//方法1: 使用內置函數split,然後遍歷添加列
val separator = ","
lazy val first = df.first()
val numAttrs = first.toString().split(separator).length
val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
//按指定分隔符拆分value列,生成splitCols列
var newDF = df.withColumn("splitCols", split($"value", separator))
attrs.zipWithIndex.foreach(x => {
newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
})
newDF.show()
/**
* +-------------------+--------------------+-----+-----+-----------+
* | value| splitCols|col_0|col_1| col_2|
* +-------------------+--------------------+-----+-----+-----------+
* |Ming,20,15552211521|[Ming, 20, 155522...| Ming| 20|15552211521|
* |hong,19,13287994007|[hong, 19, 132879...| hong| 19|13287994007|
* | zhi,21,15552211523|[zhi, 21, 1555221...| zhi| 21|15552211523|
* +-------------------+--------------------+-----+-----+-----------+
2.2 使用UDF函數創建多列數據,然後合並
該方法是使用udf函數,生成多個列,然後合並到原來的數據。該方法參考了VectorDisassembler(與spark ml官網提供的VectorAssembler相反),這是一個第三方的spark ml向量拆分算法,該方法github地址:https://github.com/jamesbconner/VectorDisassembler。代碼如下所示:
//方法2:使用udf函數創建多列,然後合並
val attributes: Array[Attribute] = {
val numAttrs = first.toString().split(separator).length
//生成attributes
Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
}
//創建多列數據
val fieldCols = attributes.zipWithIndex.map(x => {
val assembleFunc = udf {
str: String =>
str.split(separator)(x._2)
}
assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
})
//合並數據
df.select(col("*") +: fieldCols: _*).show()
/**
* +-------------------+-------+-------+-----------+
* | value|value_0|value_1| value_2|
* +-------------------+-------+-------+-----------+
* |Ming,20,15552211521| Ming| 20|15552211521|
* |hong,19,13287994007| hong| 19|13287994007|
* | zhi,21,15552211523| zhi| 21|15552211523|
* +-------------------+-------+-------+-----------+
*/
完整代碼:
import org.apache.spark.ml.attribute.{Attribute, NumericAttribute}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
/**
* Created by shirukai on 2018/9/12
* 拆分列
*/
object SplitColTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName(this.getClass.getSimpleName)
.master("local")
.getOrCreate()
//從內存中創建DataFrame
import spark.implicits._
val df = Seq("Ming,20,15552211521", "hong,19,13287994007", "zhi,21,15552211523")
.toDF("value")
df.show()
/**
* +-------------------+
* | value|
* +-------------------+
* |Ming,20,15552211521|
* |hong,19,13287994007|
* | zhi,21,15552211523|
* +-------------------+
*/
import org.apache.spark.sql.functions._
//方法1: 使用內置函數split,然後遍歷添加列
val separator = ","
lazy val first = df.first()
val numAttrs = first.toString().split(separator).length
val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
//按指定分隔符拆分value列,生成splitCols列
var newDF = df.withColumn("splitCols", split($"value", separator))
attrs.zipWithIndex.foreach(x => {
newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
})
newDF.show()
/**
* +-------------------+--------------------+-----+-----+-----------+
* | value| splitCols|col_0|col_1| col_2|
* +-------------------+--------------------+-----+-----+-----------+
* |Ming,20,15552211521|[Ming, 20, 155522...| Ming| 20|15552211521|
* |hong,19,13287994007|[hong, 19, 132879...| hong| 19|13287994007|
* | zhi,21,15552211523|[zhi, 21, 1555221...| zhi| 21|15552211523|
* +-------------------+--------------------+-----+-----+-----------+
*/
//方法2:使用udf函數創建多列,然後合並
val attributes: Array[Attribute] = {
val numAttrs = first.toString().split(separator).length
//生成attributes
Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
}
//創建多列數據
val fieldCols = attributes.zipWithIndex.map(x => {
val assembleFunc = udf {
str: String =>
str.split(separator)(x._2)
}
assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
})
//合並數據
df.select(col("*") +: fieldCols: _*).show()
/**
* +-------------------+-------+-------+-----------+
* | value|value_0|value_1| value_2|
* +-------------------+-------+-------+-----------+
* |Ming,20,15552211521| Ming| 20|15552211521|
* |hong,19,13287994007| hong| 19|13287994007|
* | zhi,21,15552211523| zhi| 21|15552211523|
* +-------------------+-------+-------+-----------+
*/
}
}
Spark DataFrame列的合並與拆分