1. 程式人生 > >spark dataframe實戰(持續更新)

spark dataframe實戰(持續更新)

spark dataframe實戰

說明:該文基於spark-2.0+

dataframe介紹

dataframe是dataset的行的集合。

Dataset是分散式資料集合。Dataset是Spark 1.6+中新增的一個新介面,它提供了RDD的很多優點。 (強型別化,使用強大的lambda函式的功能),以及Spark SQL優化執行引擎的優點。資料集可以從JVM物件構建,然後使用函式轉換(map,flatMap,filter等)進行操作。資料集API可用於Scala和Java。
Python不支援資料集API。但是由於Python的動態特性,資料集API的許多優點已經可用(即,您可以通過自然的row.columnName名稱來訪問行的欄位)。R的情況是相似的。

一個DataFrame是一個數據集組織成命名列。它在概念上等同於關係資料庫中的表或R / Python中的(dataframe)資料框,但是在實現引擎層面有更多的優化。
DataFrame可以從各種各樣的源構建,例如:結構化資料檔案,Hive中的表,外部資料庫或現有的RDD。DataFrame API可用於Scala,Java,Python和R.在Scala和Java中,DataFrame由行資料集表示。在Scala API中,DataFrame只是Dataset [Row]的類型別名。而在Java API中,使用者需要使用Dataset 來表示一個DataFrame。

在整篇文件中,我們經常將Scala / Java資料集作為DataFrames。

sparkSession

在spark-2.0以後,引入了sparkSession來對資源進行管理。包括管理spark Context等。建立一個sparkSession的程式碼如下。

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits
._

建立Dataframe

通過toDs來建立dataframe

val ds = Seq(1, 2, 3).toDS()

載入檔案資料建立

dataframe可以載入各種格式的檔案。
下面的例子會通過一個數據檔案進行講解,該檔案的內容如下:

$ hadoop fs -cat /user/zxh/csvdata/csvdata
id,name,subject,score
1,n1,s1,10
2,n2,s2,20
3,n3,s3,30
3,n3,s1,20
4,n4,s2,40
5,n5,s3,50
6,n6,s1,60
7,n6,s2,40
8,n8,s3,90
8,n9,s1,30
9,n9,s1,20
9,n9,s2,70
  • 載入csv檔案
    注意:spark是開始建立的sparkSession實體。
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
import spark.implicits._
val df = spark.read.csv("/user/hadoop/csvdata/csvdata")

scala> val df = spark.read.option("header",true).csv("/user/hadoop/csvdata/csvdata")
df: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

scala> df.show()
+---+----+-------+-----+
| id|name|subject|score|
+---+----+-------+-----+
|  1|  n1|     s1|   10|
|  2|  n2|     s2|   20|
|  3|  n3|     s3|   30|
|  3|  n3|     s1|   20|
|  4|  n4|     s2|   40|
|  5|  n5|     s3|   50|
|  6|  n6|     s1|   60|
|  7|  n6|     s2|   40|
|  8|  n8|     s3|   90|
|  8|  n9|     s1|   30|
|  9|  n9|     s1|   20|
|  9|  n9|     s2|   70|
+---+----+-------+-----+
  • 載入json檔案
$ hadoop fs -cat /user/zxh/jsondata/jsondata
{ "name":"Yin", "address":{ "city":"Columbus", "state":"Ohio" }}

scala> val df = spark.read.json("/user/zxh/jsondata/jsondata")
df: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, name: string]

scala> df.show()
+---------------+----+
|        address|name|
+---------------+----+
|[Columbus,Ohio]| Yin|
+---------------+----+
  • 載入parquet檔案
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

從JDBC載入資料

  • 通過JDBC讀取資料
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

操作Dataframe

  • 為dataframe設定新的欄位名(列名)
scala> val newNames = Seq("id1", "name1", "score1")
newNames: Seq[String] = List(id1, name1, score1)

scala> val dfRenamed = df.toDF(newNames: _*)
dfRenamed: org.apache.spark.sql.DataFrame = [id1: string, name1: string ... 1 more field]

scala> dfRenamed.show()
+---+-----+------+
|id1|name1|score1|
+---+-----+------+
|  1|   n1|    10|
|  2|   n2|    20|
|  3|   n3|    30|
|  4|   n4|    40|
|  5|   n5|    50|
|  6|   n6|    60|
|  7|   n6|    60|
|  8|   n8|    60|
|  8|   n9|    60|
|  9|   n9|    60|
+---+-----+------+
  • 新增一個新列:通過其他列的值來新增
    為了新增一列,我們可以使用withColumn函式來
scala> val df2 = df.withColumn("newscore", df("score")+50)
df2: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

scala> df2.show()
+---+----+-----+--------+
| id|name|score|newscore|
+---+----+-----+--------+
|  1|  n1|   10|    60.0|
|  2|  n2|   20|    70.0|
|  3|  n3|   30|    80.0|
|  4|  n4|   40|    90.0|
|  5|  n5|   50|   100.0|
|  6|  n6|   60|   110.0|
|  7|  n6|   60|   110.0|
|  8|  n8|   60|   110.0|
|  8|  n9|   60|   110.0|
|  9|  n9|   60|   110.0|
+---+----+-----+--------+
  • 條件去重
    通過select可以選擇要返回的列,通過where函式可以對列進行篩選。在使用distinct函式去重。
scala> val df3 = df.select("name").where($"score">50).distinct()
df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string]

scala> df3.show()
+----+
|name|
+----+
|  n8|
|  n9|
|  n6|
+----+
  • 根據多列的值去重
    可以看到以下程式碼按name,score這兩列的值進行去重。
scala> val df3 = df.select("name", "score").where($"score">50).distinct()
df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, score: string]

scala> df3.show()
+----+-----+
|name|score|
+----+-----+
|  n9|   60|
|  n8|   60|
|  n6|   60|
+----+-----+
  • 如何把dataframe的整數列轉換成字串

以下程式碼把df的col1列的值轉換成字串型別:

var df2 = df.withColumn("col1", df("col1").cast("string"))