1. 程式人生 > >Spark2.x 快速入門教程 2

Spark2.x 快速入門教程 2

Spark SQL之 Dataframe/Dataset

一、實驗介紹

1.1 實驗內容

從 Spark 2.0 始支援了SQL 2003 準語法。當我們使用某種程式語言開發的 Spark 作業來執行 SQL 時,返回的結果是 Dataframe/Dataset 型別的。本節課我們將通過 Spark Sql 的 shell 命令列工具進行 Dataframe/Dataset 操作,以加深對 Spark 2.0 Sql 的理解,並在此基礎上學以致用。

1.2 先學課程

1.3 實驗知識點

  • Dataframe/Dataset 介紹
  • SparkSession
  • Dataframe/Dataset 案例操作

1.4 實驗環境

  • Hadoop 2.6.1
  • spark-2.1.0-bin-hadoop2.6.tgz
  • Xfce 終端

1.5 適合人群

本課程屬於中等難度級別,適合具有大資料基礎的使用者,如果對 Spark Sql 瞭解能夠更好的上手本課程。

二、實驗步驟

2.1 準備工作

雙擊開啟桌面上的 Xfce 終端,用 sudo 命令切換到 hadoop 使用者,hadoop 使用者密碼為 hadoop,用 cd 命令進入 /opt目錄。

$ su hadoop
$ cd /opt/

此處輸入圖片的描述

在 /opt 目錄下格式化 hadoop。

$ hadoop-2.6.1/bin/hdfs namenode -format

此處輸入圖片的描述

在 /opt

 目錄下啟動 hadoop 程序。

$ start-all.sh

此處輸入圖片的描述

用 jps 檢視 hadoop 程序是否啟動。

此處輸入圖片的描述

2.2 SparkSession 介紹

Spark SQL 統一入口就是 SparkSession,可以通過 SparkSession.builder()

來建立一個 SparkSession,有了 SparkSession 之後,就可以通過已有的 RDD,Hive 表,或者其他資料來源來建立 Dataframe。由於本節課基於 Spark SQL 的 shell 命令列操作,所以在此僅此一提,在以後使用 Eclispe 等開發工具時再使用。

2.3 Dataframe

我們可以理解為 Dataframe 就是按列組織的 Dataset,在底層做了大量的優化。Dataframe 可以通過很多方式來構造:比如結構化的資料檔案,Hive表,資料庫,已有的 RDD,Scala,Java,Python,R 等語言都支援 Dataframe。Dataframe 提供了一種 domain-specific language 來進行結構化資料的操作,這種操作也被稱之為 untyped 操作,與之相反的是基於強型別的 typed 操作。

準備資料 person.json,並上傳至 hdfs,person.json 資料如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

mkdir命令在 hdfs 檔案系統上建立 testdata 資料夾,並上傳person.json

hadoop fs -mkdir /testdata
sudo  vi person.json
hadoop fs -put person.json /testdata

執行命令

hadoop fs -cat /testdata/person.json

檢視內容

此處輸入圖片的描述

在 /opt 目錄下啟動 Spark。

$ spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh

此處輸入圖片的描述

用 jps 檢視 spark 程序是否啟動

此處輸入圖片的描述

在 spark 的 bin 目錄下啟動 spark-shell

$ cd spark-2.1.0-bin-hadoop2.6/bin
# a4cd888f4ca9 是主機名,要與您 hostnane 顯示的名字一致
$ ./spark-shell --master spark://a4cd888f4ca9:7077

此處輸入圖片的描述

讀取 json 檔案,構造一個 untyped 弱型別的 dataframe。

$ val df = spark.read.json("hdfs://localhost:9000/testdata/person.json")

此處輸入圖片的描述

對資料進行操作。

$ df.show() //列印資料
$ df.printSchema()  // 列印元資料

此處輸入圖片的描述

$ df.select($"name", $"age" + 1).show()  // 使用表示式,scala的語法,要用$符號作為字首
$ df.select("name").show()  // select操作,典型的弱型別,untyped操作

此處輸入圖片的描述

$ df.createOrReplaceTempView("person")  // 基於dataframe建立臨時檢視
$ spark.sql("SELECT * FROM person").show() // 用SparkSession的sql()函式就可以執行sql語句,預設是針對建立的臨時檢視

此處輸入圖片的描述

2.4 Dataset

Dataset 的序列化機制基於一種特殊的 Encoder,來將物件進行高效序列化,以進行高效能處理或者是通過網路進行傳輸。

基於原始資料型別構造 dataset。

$ val sqlDS = Seq(1, 2, 3, 4, 5).toDS()
$ sqlDS.map(_*2).show()

此處輸入圖片的描述

基於已有的結構化資料檔案,構造 dataset。

$ case class Person(name: String, age: Long)
$ val pds = spark.read.json("hdfs://localhost:9000/testdata/person.json").as[Person]
$ pds.show()

此處輸入圖片的描述

直接基於 jvm object 來構造 dataset。

$ val caseDS = Seq(Person("Zhudy", 28)).toDS() 
$ caseDS.show()

此處輸入圖片的描述

若想要退出終端,可以用 :quit

此處輸入圖片的描述

三、綜合案例分析

現有資料集 department.jsonemployee.json,以部門名稱和員工性別為粒度,試計算每個部門分性別平均年齡與平均薪資。

department.json如下:

{"id": 1, "name": "Tech Department"}
{"id": 2, "name": "Fina Department"}
{"id": 3, "name": "HR Department"}

employee.json如下:

{"name": "zhangsan", "age": 26, "depId": 1, "gender": "male", "salary": 20000}
{"name": "lisi", "age": 36, "depId": 2, "gender": "female", "salary": 8500}
{"name": "wangwu", "age": 23, "depId": 1, "gender": "male", "salary": 5000}
{"name": "zhaoliu", "age": 25, "depId": 3, "gender": "male", "salary": 7000}
{"name": "marry", "age": 19, "depId": 2, "gender": "female", "salary": 6600}
{"name": "Tom", "age": 36, "depId": 1, "gender": "female", "salary": 5000}
{"name": "kitty", "age": 43, "depId": 2, "gender": "female", "salary": 6000}

兩份資料我已經建立並上傳至 hdfs 檔案系統,請自行建立。

執行命令

hadoop fs -cat hdfs://localhost:9000/testdata/department.json
hadoop fs -cat hdfs://localhost:9000/testdata/employee.json

檢視內容

此處輸入圖片的描述

此處輸入圖片的描述

1). 載入資料

$ val emp = spark.read.json("hdfs://localhost:9000/testdata/employee.json")
$ val dep = spark.read.json("hdfs://localhost:9000/testdata/department.json")

此處輸入圖片的描述

2). 用運算元操作

需要兩個表進行 join 操作才能根據部門名稱和員工性別分組再進行聚合,具體如下:

# 注意:兩個表的欄位的連線條件,需要使用三個等號
$ emp.join(dep, $"id" === $"depId")  .groupBy(dep("name"), emp("gender")).agg(avg(emp("salary")), avg(emp("age"))).show()

此處輸入圖片的描述

可以看到 workers 節點,當前正在進行的 Runnign Applications,即為我們的開啟的 spark-shell 命令列工具。

此處輸入圖片的描述

此處輸入圖片的描述

繼續點選 SQL ->可以看到 Completed Queries,點選任意一個可以看到執行計劃,當然還有更多的資訊,您可以繼續檢視,在此不一一舉例。

此處輸入圖片的描述

四、實驗總結

本節課主要介紹了 Dataframe/Dataset,並進行一些較為簡單的 SQL 操作,最後基於一個小案例講解,並就 Spark 的 WebUi 進行介紹,希望學完本節課,能對您學習 Spark SQL 有一個更清晰地認識。

五、擴充套件閱讀