1. 程式人生 > >Spark的基本結構及SparkSQL元件的基本用法

Spark的基本結構及SparkSQL元件的基本用法

本文主要側重對Spark核心結構和元件的功能介紹。

Apache Spark簡介

Apache Spark是一種快速的叢集計算技術,專為快速計算而設計。它基於Hadoop MapReduce,它擴充套件了MapReduce模型,以有效地將其用於更多型別的計算,包括互動式查詢和流處理。Spark的主要特性是它的記憶體中叢集計算,提高了應用程式的處理速度。Spark旨在涵蓋各種工作負載,如批處理應用程式,迭代演算法,互動式查詢和流式處理。除了在相應系統中支援所有這些工作負載之外,它還減少了維護單獨工具的管理負擔。
這裡寫圖片描述

MapReduce與Spark迭代計算與查詢對比

MapReduce被廣泛應用於在叢集上使用並行的分散式演算法來處理和生成大型資料集。它允許使用者使用一組高階操作符來編寫平行計算,而不必擔心工作分佈和容錯。不幸的是,在大多數當前框架中,在計算之間重複使用資料(例如:兩個MapReduce作業之間)的唯一方法是將其寫入外部穩定儲存系統(例如:HDFS)。雖然這個框架提供了訪問叢集的計算資源的許多抽象,使用者仍然想要更多。迭代和互動應用程式都需要跨並行作業更快的資料共享。由於複製,序列化和磁碟IO,MapReduce中的資料共享速度很慢

。關於儲存系統,大多數Hadoop應用程式,他們花費90%以上的時間做HDFS讀寫操作。
彈性分散式資料集(RDD)是Spark的基本資料結構。它是一個不可變的分散式物件集合。 RDD中的每個資料集劃分為邏輯分割槽,可以在叢集的不同節點上計算。 RDD可以包含任何型別的Python,Java或Scala物件,包括使用者定義的類。形式上,RDD是隻讀的,分割槽的記錄集合。 RDD可以通過對穩定儲存器或其他RDD上的資料的確定性操作來建立。 RDD是可以並行操作的元件的容錯集合。有兩種方法來建立RDD - 並行化驅動程式中的現有集合,或引用外部儲存系統中的資料集,例如共享檔案系統,HDFS,HBase或提供Hadoop輸入格式的任何資料來源。Spark使用RDD的概念來實現更快和更高效的MapReduce操作。
這裡寫圖片描述

這裡寫圖片描述

SparkSQL簡介及基本用法

Spark為結構化資料處理引入了一個稱為Spark SQL的程式設計模組。它提供了一個稱為DataFrame的程式設計抽象,並且可以充當分散式SQL查詢引擎。Spark SQL允許您將結構化資料作為Spark中的分散式資料集(RDD)進行查詢,在Python,Scala和Java中集成了API,這種緊密的整合使得可以輕鬆地執行SQL查詢以及複雜的分析演算法。
Spark Core(利用RDD資料結構設計)的資料來源:文字檔案,Avro檔案等。
Spark SQL(適用於模式、表和記錄)的資料來源:Parquet檔案,JSON文件,HIVE表和Cassandra資料庫。可以使用Schema RDD作為臨時表,將Schema RDD稱為資料幀。
DataFrame是一個分散式資料集合,它被組織成命名列。從概念上講,它相當於具有良好優化技術的關係表。DataFrame可以從不同來源的陣列構造,例如Hive表,結構化資料檔案,外部資料庫或現有RDD。

這裡寫圖片描述
與spark-shell同級啟動目錄下分別生成資料準備檔案people.json和people.txt如下:
這裡寫圖片描述
建立SQLContext

val sqlContext=new org.apache.spark.sql.SQLContext(sc)

讀取JSON文件people.json

val dfs=sqlContext.read.json("people.json")

#Ouput:
#dfs: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

顯示資料框的資料

dfs.show   #或者dfs.show()

#Output:
#+----+-------+
#| age|   name|
#+----+-------+
#|null|Michael|
#|  30|   Andy|
#|  19| Justin|
#+----+-------+

檢視DataFrame的Structure(Schema)

dfs.printSchema()

#Output:
#root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

select,從DataFrame裡選擇某一列

dfs.select("name").show()

#Output:
#+-------+
#|   name|
#+-------+
#|Michael|
#|   Andy|
#| Justin|
#+-------+

filter,利用過濾器篩選樣本

dfs.filter(dfs("age")>20).show()

#Output:
#+---+----+
#|age|name|
#+---+----+
#| 30|Andy|
#+---+----+

groupBy,對樣本進行分組;count,進行分組後的每組計數

dfs.groupBy("age").count().show()

#Output:
#+----+-----+                                                                    
#| age|count|
#+----+-----+
#|  19|    1|
#|null|    1|
#|  30|    1|
#+----+-----+

SQLContext使應用程式能夠在執行SQL函式時以程式設計方式執行SQL查詢,並將結果作為DataFrame返回。SparkSQL支援兩種不同的方法將現有的RDD轉換為DataFrames:
(1)使用反射來生成包含特定型別的物件的RDD的模式
Spark SQL的Scala介面支援將”包含case類的RDD”自動轉換為DataFrame。 case類定義了表的模式。 “case類的引數的名稱”使用”反射”讀取,它們成為”列的名稱”。

#建立sqlContext
val sqlContext=new org.apache.spark.sql.SQLContext(sc)

#匯入用於將RDD隱式轉換為DataFrame的所有SQL函式
import sqlContext.implicits._

#必須使用case類定義記錄資料的模式
case class people(name: String, age: Int)

#從中讀取資料people.txt並使用Map函式將其轉換為DataFrame
val empl=sc.textFile("people.txt").map(_.split(",")).map(e => people(e(0),e(1).trim.toInt)).toDF()

#將DataFrame資料儲存在表peoples中
empl.registerTempTable("peoples")

#選擇DataFrame上的查詢
val allrecords=sqlContext.sql("SELECT * FROM peoples")

#顯示查詢結果
allrecords.show
#Output:
#+-------+---+
#|   name|age|
#+-------+---+
#|Michael| 29|
#|   Andy| 30|
#| Justin| 19|
#+-------+---+

#帶條件查詢
val agefilter=sqlContext.sql("SELECT * FROM peoples WHERE age>=20 AND age<=35")
agefilter.show()
#Output:
#+-------+---+
#|   name|age|
#+-------+---+
#|Michael| 29|
#|   Andy| 30|
#+-------+---+

#以上兩個查詢是針對整個表DataFrame傳遞的,下面嘗試通過對“查詢後的結果”應用Transform來“從結果DataFrame獲取資料”
agefilter.map(t=>"NAME:"+t(0)).collect().foreach(println)
#Output:
#NAME:Michael
#NAME:Andy

(2)通過程式設計介面建立DataFrame
通過程式設計介面,構造一個模式,然後將其應用到現有的RDD。根據以下三個步驟以程式設計方式建立一個DataFrame:a. 從原始RDD建立Row RDD;b. 建立與上一步裡建立的Row RDD中“Row結構”匹配的StructType表示的模式;c. 通過sqlContext提供的createDataFrame方法將模式應用於Row RDD。

import org.apache.spark.sql.SQLContext
#Output:
#import org.apache.spark.sql.SQLContext

#建立SQLContext物件
val sqlContext=new SQLContext(sc)
#Output:
#warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = [email protected]1a717d79

#從文字檔案讀取輸入
val people=sc.textFile("file:///Users/yuyang/spark-2.0.1-bin-hadoop2.6/bin/people.txt")
#Output:
#people: org.apache.spark.rdd.RDD[String] = file:///Users/yuyang/spark-2.0.1-bin-hadoop2.6/bin/people.txt MapPartitionsRDD[1] at textFile at <console>:25

#以字串格式建立編碼模式,假設一個表的欄位結構
val schemaString="name age"
#Output:
#schemaString: String = name age

#匯入行功能
import org.apache.spark.sql.Row
#Output:
#import org.apache.spark.sql.Row

#匯入SQL資料型別
import org.apache.spark.sql.types.{StructType,StructField,StringType}
#Output:
#import org.apache.spark.sql.types.{StructType, StructField, StringType}

#通過讀取schemaString變數來生成模式
#將整個字串以空格作為分隔符來讀取每個欄位,並且預設情況下,每個欄位型別為String型別
val schema=StructType(schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
#Output:schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))

#將RDD(people)轉換為Rows,指定讀取RDD資料的邏輯並將其儲存到rowRDD中
val rowRDD=people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
#Output:
#rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:29

#基於模式schema應用rowRDD建立DataFrame
val peopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)
#Output:
#peopleDataFrame: org.apache.spark.sql.DataFrame = [name: string, age: string]

#將資料幀儲存到名為peopleTempTab的表中
peopleDataFrame.registerTempTable("peopleTempTab")
#Output:
#there was one deprecation warning; re-run with -deprecation for details

val personsRDD=sqlContext.sql("select name,age from peopleTempTab where age>20").rdd
#Output:
#personsRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[9] at rdd at <console>:29

personsRDD.foreach(t=>println("Name:"+t(0)+",Age:"+t(1)))
#Output:
#Name:Michael,Age:29
#Name:Andy,Age:30

這裡寫圖片描述