spark dataframe實戰(持續更新)
spark dataframe實戰
說明:該文基於spark-2.0+
dataframe介紹
dataframe是dataset的行的集合。
Dataset是分散式資料集合。Dataset是Spark 1.6+中新增的一個新介面,它提供了RDD的很多優點。 (強型別化,使用強大的lambda函式的功能),以及Spark SQL優化執行引擎的優點。資料集可以從JVM物件構建,然後使用函式轉換(map,flatMap,filter等)進行操作。資料集API可用於Scala和Java。
Python不支援資料集API。但是由於Python的動態特性,資料集API的許多優點已經可用(即,您可以通過自然的row.columnName名稱來訪問行的欄位)。R的情況是相似的。
一個DataFrame是一個數據集組織成命名列。它在概念上等同於關係資料庫中的表或R / Python中的(dataframe)資料框,但是在實現引擎層面有更多的優化。
DataFrame可以從各種各樣的源構建,例如:結構化資料檔案,Hive中的表,外部資料庫或現有的RDD。DataFrame API可用於Scala,Java,Python和R.在Scala和Java中,DataFrame由行資料集表示。在Scala API中,DataFrame只是Dataset [Row]的類型別名。而在Java API中,使用者需要使用Dataset 來表示一個DataFrame。
在整篇文件中,我們經常將Scala / Java資料集作為DataFrames。
sparkSession
在spark-2.0以後,引入了sparkSession來對資源進行管理。包括管理spark Context等。建立一個sparkSession的程式碼如下。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits ._
建立Dataframe
通過toDs來建立dataframe
val ds = Seq(1, 2, 3).toDS()
載入檔案資料建立
dataframe可以載入各種格式的檔案。
下面的例子會通過一個數據檔案進行講解,該檔案的內容如下:
$ hadoop fs -cat /user/zxh/csvdata/csvdata
id,name,subject,score
1,n1,s1,10
2,n2,s2,20
3,n3,s3,30
3,n3,s1,20
4,n4,s2,40
5,n5,s3,50
6,n6,s1,60
7,n6,s2,40
8,n8,s3,90
8,n9,s1,30
9,n9,s1,20
9,n9,s2,70
- 載入csv檔案
注意:spark是開始建立的sparkSession實體。
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
import spark.implicits._
val df = spark.read.csv("/user/hadoop/csvdata/csvdata")
scala> val df = spark.read.option("header",true).csv("/user/hadoop/csvdata/csvdata")
df: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]
scala> df.show()
+---+----+-------+-----+
| id|name|subject|score|
+---+----+-------+-----+
| 1| n1| s1| 10|
| 2| n2| s2| 20|
| 3| n3| s3| 30|
| 3| n3| s1| 20|
| 4| n4| s2| 40|
| 5| n5| s3| 50|
| 6| n6| s1| 60|
| 7| n6| s2| 40|
| 8| n8| s3| 90|
| 8| n9| s1| 30|
| 9| n9| s1| 20|
| 9| n9| s2| 70|
+---+----+-------+-----+
- 載入json檔案
$ hadoop fs -cat /user/zxh/jsondata/jsondata
{ "name":"Yin", "address":{ "city":"Columbus", "state":"Ohio" }}
scala> val df = spark.read.json("/user/zxh/jsondata/jsondata")
df: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, name: string]
scala> df.show()
+---------------+----+
| address|name|
+---------------+----+
|[Columbus,Ohio]| Yin|
+---------------+----+
- 載入parquet檔案
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
從JDBC載入資料
- 通過JDBC讀取資料
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
操作Dataframe
- 為dataframe設定新的欄位名(列名)
scala> val newNames = Seq("id1", "name1", "score1")
newNames: Seq[String] = List(id1, name1, score1)
scala> val dfRenamed = df.toDF(newNames: _*)
dfRenamed: org.apache.spark.sql.DataFrame = [id1: string, name1: string ... 1 more field]
scala> dfRenamed.show()
+---+-----+------+
|id1|name1|score1|
+---+-----+------+
| 1| n1| 10|
| 2| n2| 20|
| 3| n3| 30|
| 4| n4| 40|
| 5| n5| 50|
| 6| n6| 60|
| 7| n6| 60|
| 8| n8| 60|
| 8| n9| 60|
| 9| n9| 60|
+---+-----+------+
- 新增一個新列:通過其他列的值來新增
為了新增一列,我們可以使用withColumn函式來
scala> val df2 = df.withColumn("newscore", df("score")+50)
df2: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]
scala> df2.show()
+---+----+-----+--------+
| id|name|score|newscore|
+---+----+-----+--------+
| 1| n1| 10| 60.0|
| 2| n2| 20| 70.0|
| 3| n3| 30| 80.0|
| 4| n4| 40| 90.0|
| 5| n5| 50| 100.0|
| 6| n6| 60| 110.0|
| 7| n6| 60| 110.0|
| 8| n8| 60| 110.0|
| 8| n9| 60| 110.0|
| 9| n9| 60| 110.0|
+---+----+-----+--------+
- 條件去重
通過select可以選擇要返回的列,通過where函式可以對列進行篩選。在使用distinct函式去重。
scala> val df3 = df.select("name").where($"score">50).distinct()
df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string]
scala> df3.show()
+----+
|name|
+----+
| n8|
| n9|
| n6|
+----+
- 根據多列的值去重
可以看到以下程式碼按name,score這兩列的值進行去重。
scala> val df3 = df.select("name", "score").where($"score">50).distinct()
df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, score: string]
scala> df3.show()
+----+-----+
|name|score|
+----+-----+
| n9| 60|
| n8| 60|
| n6| 60|
+----+-----+
- 如何把dataframe的整數列轉換成字串
以下程式碼把df的col1列的值轉換成字串型別:
var df2 = df.withColumn("col1", df("col1").cast("string"))