1. 程式人生 > >Schema RDD(DataFrame)----Spark SQL操作

Schema RDD(DataFrame)----Spark SQL操作

SchemaRDD是存放 Row 物件的 RDD,每個 Row 物件代表一行記錄。 SchemaRDD 還包含記錄的結構資訊(即資料欄位)。 SchemaRDD 看起來和普通的 RDD 很像,但是在內部, SchemaRDD 可以利用結構資訊更加高效地儲存資料。 此外, SchemaRDD 還支援 RDD 上所沒有的一些新操作,比如執行 SQL 查詢。 SchemaRDD 可以從外部資料來源建立,也可以從查詢結果或普通 RDD 中建立。

若要把 Spark SQL 連線到一個部署好的 Hive 上,你必須把 hive-site.xml 複製到Spark 的配置檔案目錄中( $SPARK_HOME/conf)。即使沒有部署好 Hive, Spark SQL 也可以執行。如果你沒有部署好 Hive, Spark SQL 會在當前的工作目錄中創建出自己的Hive 元資料倉庫, 叫作 metastore_db。此外,如果你嘗試使用 HiveQL 中的 CREATE TABLE(並非 CREATE EXTERNAL TABLE)語句來建立表,這些表會被放在你預設的檔案系統中的/user/hive/warehouse 目錄中( 如果你的 classpath 中有配好的 hdfs-site.xml,預設的檔案系統就是 HDFS,否則就是本地檔案系統)。

1. 使用Spark SQL

初始化

# 匯入Spark SQL
from pyspark.sql import HiveContext, Row
# 當不能引入hive依賴時
from pyspark.sql import SQLContext, Row
hiveCtx = HiveContext(sc)

基本查詢

input = hiveCtx.jsonFile(inputFile)
# 註冊輸入的SchemaRDD
input.registerTempTable("tweets")
# 依據retweetCount ( 轉發計數)選出推文
topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")

讀取資料和執行查詢都會返回 SchemaRDD。SchemaRDD 這個名字可能會被改為 DataFrame。SchemaRDD 仍然是 RDD, 所以你可以對其應用已有的 RDD 轉化操作,比如 map() 和filter()。然而, SchemaRDD 也提供了一些額外的功能支援。最重要的是,你可以把任意 SchemaRDD 註冊為臨時表,這樣就可以使用 HiveContext.sql 或 SQLContext.sql 來對它進行查詢了。 你可以通過 SchemaRDD 的 registerTempTable() 方法這麼做。(臨時表是當前使用的 HiveContext 或 SQLContext 中的臨時變數,在你的應用退出時這些臨時表就不再存在了)

SchemaRDD 可以儲存一些基本資料型別,也可以儲存由這些型別組成的結構體和陣列。
Spark SQL/HiveQL型別----Scala型別----Java型別----Python
TINYINT----Byte----Byte/byte----int/long ( 在 -128 到 127 之間 )
SMALLINT----Short----Short/short----int/long ( 在 -32768 到 32767之間 )
INT----Int----Int/int----int 或 long
BIGINT----Long----Long/long----long
FLOAT----Float----Float /float----float
DOUBLE----Double----Double/double----float
DECIMAL----Scala.math.BigDecimal----java.math.BigDecimal----decimal.Decimal
STRING----String----String----string
BINARY----Array[Byte]----byte[]----bytearray
BOOLEAN----Boolean----Boolean/boolean----bool
TIMESTAMP----java.sql.TimeStamp----java.sql.TimeStamp----datetime.datetime
ARRAY<DATA_TYPE>----Seq----List----list、 tuple 或 array
MAP<KEY_TYPE, VAL_TYPE>----Map----Map----dict

STRUCT<COL1:COL1_TYPE, ...>----Row----Row----Row

Row 物件表示 SchemaRDD 中的記錄,其本質就是一個定長的欄位陣列。在 Scala/Java 中,Row 物件有一系列 getter 方法,可以通過下標獲取每個欄位的值。在 Python 中,由於沒有顯式的型別系統, Row 物件變得稍有不同。我們使用 row[i] 來訪問第 i 個元素。除此之外, Python 中的 Row 還支援以 row.column_name 的形式使用名字來訪問其中的欄位。

topTweetText = topTweets.map(lambda row: row.text)

Spark SQL 的快取機制與 Spark 中的稍有不同。由於我們知道每個列的型別資訊,所以Spark 可以更加高效地儲存資料。 為了確保使用更節約記憶體的表示方式進行快取而不是儲存整個物件, 應當使用專門的 hiveCtx.cacheTable("tableName") 方法。

2. 讀取和儲存資料

Spark SQL 支援很多種結構化資料來源,可以讓你跳過複雜的讀取過程,輕鬆從各種資料來源中讀取到 Row 物件。這些資料來源包括 Hive 表、 JSON 和 Parquet 檔案。此外,當你使用SQL 查詢這些資料來源中的資料並且只用到了一部分欄位時, Spark SQL 可以智慧地只掃描這些用到的欄位,而不是像 SparkContext.hadoopFile 中那樣簡單粗暴地掃描全部資料。

2.1. Apache Hive

當從 Hive 中讀取資料時, Spark SQL 支援任何 Hive 支援的儲存格式( SerDe),包括文字檔案、 RCFiles、 ORC、 Parquet、 Avro,以及 Protocol Buffer。
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT key, value FROM mytable")
keys = rows.map(lambda row: row[0])

2.2. Parquet

Parquet是一種流行的列式儲存格式,可以高效地儲存具有巢狀欄位的記錄。 Parquet 格式經常在 Hadoop 生態圈中被使用,它也支援 Spark SQL 的全部資料型別。 Spark SQL 提供了直接讀取和儲存 Parquet 格式檔案的方法。
# 從一個有name和favouriteAnimal欄位的Parquet檔案中讀取資料
rows = hiveCtx.parquetFile(parquetFile)
names = rows.map(lambda row: row.name)
print "Everyone"
print names.collect()
你也可以把 Parquet 檔案註冊為 Spark SQL 的臨時表,並在這張表上執行查詢語句。
# 尋找熊貓愛好者
tbl = rows.registerTempTable("people")
pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"")
print "Panda friends"
print pandaFriends.map(lambda row: row.name).collect()

2.3. JSON

如果你有一個 JSON 檔案,其中的記錄遵循同樣的結構資訊,那麼 Spark SQL 就可以通過掃描檔案推測出結構資訊, 並且讓你可以使用名字訪問對應欄位。要讀取 JSON 資料,只要呼叫 hiveCtx 中的 jsonFile() 方法即可。
input = hiveCtx.jsonFile(inputFile)

2.4. 基於RDD

除了讀取資料,也可以基於 RDD 建立 SchemaRDD。在 Scala 中,帶有 case class 的 RDD可以隱式轉換成 SchemaRDD。在 Python 中, 可以建立一個由 Row 物件組成的 RDD,然後呼叫 inferSchema()。
happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")])
happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)
happyPeopleSchemaRDD.registerTempTable("happy_people")

3. 使用者自定義函式(UDF)

我們可以使用 Spark 支援的程式語言編寫好函式,然後通過 Spark SQL 內建的方法傳遞進來,非常便捷地註冊我們自己的 UDF。 在 Scala 和 Python 中,可以利用語言原生的函式和lambda 語法的支援, 而在 Java 中,則需要擴充套件對應的 UDF 類。 
# 寫一個求字串長度的UDF
hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")